This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 87ace2f08 TIKA-3986 (#1012)
87ace2f08 is described below

commit 87ace2f0894b929b77ec77477daf180cb73b8307
Author: Tim Allison <talli...@apache.org>
AuthorDate: Tue Mar 14 12:01:16 2023 -0400

    TIKA-3986 (#1012)
    
    * TIKA-3986 -- strip \u0000 for postgres varchar and make column type 
handling more efficient
---
 .../tika/pipes/emitter/jdbc/JDBCEmitter.java       | 225 +++++++++++++++------
 .../tika/pipes/emitter/jdbc/JDBCEmitterTest.java   |  38 ++++
 .../configs/tika-config-jdbc-emitter-trunc.xml     |  44 ++++
 3 files changed, 249 insertions(+), 58 deletions(-)

diff --git 
a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
 
b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
index e51351430..e72d6ba19 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
@@ -82,7 +82,7 @@ public class JDBCEmitter extends AbstractEmitter implements 
Initializable, Close
     //some file formats do not have time zones...
     //try both
     private static final String[] TIKA_DATE_PATTERNS =
-            new String[]{"yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd'T'HH:mm:ss"};
+            new String[] {"yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd'T'HH:mm:ss"};
     //the "write" lock is used for creating the table
     private static ReadWriteLock READ_WRITE_LOCK = new 
ReentrantReadWriteLock();
     //this keeps track of which table + connection string have been created
@@ -99,7 +99,11 @@ public class JDBCEmitter extends AbstractEmitter implements 
Initializable, Close
 
     private int maxRetries = 0;
 
+    //used only for specification of column name/string definition of
+    //keys
     private Map<String, String> keys;
+
+    private List<ColumnDefinition> columns;
     private Connection connection;
     private PreparedStatement insertStatement;
     private AttachmentStrategy attachmentStrategy = 
AttachmentStrategy.FIRST_ONLY;
@@ -113,6 +117,11 @@ public class JDBCEmitter extends AbstractEmitter 
implements Initializable, Close
     //multithreaded, this will be a big problem.
     private final DateFormat[] dateFormats;
 
+    private int maxStringLength = 64000;
+
+    //this is set during the initialize phase
+    private StringNormalizer stringNormalizer;
+
     public JDBCEmitter() {
         dateFormats = new DateFormat[TIKA_DATE_PATTERNS.length];
         int i = 0;
@@ -147,6 +156,18 @@ public class JDBCEmitter extends AbstractEmitter 
implements Initializable, Close
         this.connectionString = connection;
     }
 
+    /**
+     * Set the maximum string length in characters (not bytes).
+     * This is applies only to fields with name &quot;string&quot;
+     * not to &quot;varchar&quot;.
+     *
+     * @param maxStringLength
+     */
+    @Field
+    public void setMaxStringLength(int maxStringLength) {
+        this.maxStringLength = maxStringLength;
+    }
+
     public void setMaxRetries(int maxRetries) {
         this.maxRetries = maxRetries;
     }
@@ -293,8 +314,8 @@ public class JDBCEmitter extends AbstractEmitter implements 
Initializable, Close
             int col = 0;
             insertStatement.setString(++col, emitKey);
             insertStatement.setInt(++col, i);
-            for (Map.Entry<String, String> e : keys.entrySet()) {
-                updateValue(insertStatement, ++col, e.getKey(), e.getValue(), 
i, metadataList);
+            for (ColumnDefinition columnDefinition : columns) {
+                updateValue(emitKey, insertStatement, ++col, columnDefinition, 
i, metadataList);
             }
             insertStatement.addBatch();
         }
@@ -308,8 +329,8 @@ public class JDBCEmitter extends AbstractEmitter implements 
Initializable, Close
             dateFormats[i] = new SimpleDateFormat(TIKA_DATE_PATTERNS[j], 
Locale.US);
         }
         insertStatement.setString(++i, emitKey);
-        for (Map.Entry<String, String> e : keys.entrySet()) {
-            updateValue(insertStatement, ++i, e.getKey(), e.getValue(), 0, 
metadataList);
+        for (ColumnDefinition columnDefinition : columns) {
+            updateValue(emitKey, insertStatement, ++i, columnDefinition, 0, 
metadataList);
         }
     }
 
@@ -356,55 +377,52 @@ public class JDBCEmitter extends AbstractEmitter 
implements Initializable, Close
         }
     }
 
-    private void updateValue(PreparedStatement insertStatement, int i, String 
key, String type,
-                             int metadataListIndex, List<Metadata> 
metadataList)
+    private void updateValue(String emitKey, PreparedStatement 
insertStatement, int i,
+                             ColumnDefinition columnDefinition, int 
metadataListIndex,
+                             List<Metadata> metadataList)
             throws SQLException {
         Metadata metadata = metadataList.get(metadataListIndex);
-        String val = getVal(metadata, key, type);
-        String lcType = type.toLowerCase(Locale.US);
-        if (lcType.startsWith("varchar")) {
-            updateVarchar(key, lcType, insertStatement, i, val);
-            return;
-        }
-        switch (lcType) {
-            case "string":
-                updateString(insertStatement, i, val);
+        String val = getVal(metadata, columnDefinition);
+        switch (columnDefinition.getType()) {
+            case Types.VARCHAR:
+                updateVarchar(emitKey, columnDefinition, insertStatement, i, 
val);
                 break;
-            case "bool":
-            case "boolean":
+            case Types.BOOLEAN:
                 updateBoolean(insertStatement, i, val);
                 break;
-            case "int":
-            case "integer":
+            case Types.INTEGER:
                 updateInteger(insertStatement, i, val);
                 break;
-            case "bigint":
-            case "long":
+            case Types.BIGINT:
                 updateLong(insertStatement, i, val);
                 break;
-            case "float":
+            case Types.FLOAT:
                 updateFloat(insertStatement, i, val);
                 break;
-            case "double":
+            case Types.DOUBLE:
                 updateDouble(insertStatement, i, val);
                 break;
-            case "timestamp":
+            case Types.TIMESTAMP:
                 updateTimestamp(insertStatement, i, val, dateFormats);
                 break;
             default:
-                throw new IllegalArgumentException("Can only process: 
'string', 'boolean', 'int' " +
-                        "and 'long' types so far.  Please open a ticket to 
request: " + type);
+                throw new IllegalArgumentException(
+                        "Can only process:" + getHandledTypes() +
+                                " types so far.  " +
+                                "Please open a ticket to request: " +
+                                columnDefinition.getType() + " for " +
+                                columnDefinition.getColumnName());
         }
     }
 
-    private String getVal(Metadata metadata, String key, String type) {
-        if (!type.equals("string") && !type.startsWith("varchar")) {
-            return metadata.get(key);
+    private String getVal(Metadata metadata, ColumnDefinition 
columnDefinition) {
+        if (columnDefinition.getType() != Types.VARCHAR) {
+            return metadata.get(columnDefinition.getColumnName());
         }
         if (multivaluedFieldStrategy == MultivaluedFieldStrategy.FIRST_ONLY) {
-            return metadata.get(key);
+            return metadata.get(columnDefinition.getColumnName());
         }
-        String[] vals = metadata.getValues(key);
+        String[] vals = metadata.getValues(columnDefinition.getColumnName());
         if (vals.length == 0) {
             return null;
         } else if (vals.length == 1) {
@@ -413,7 +431,7 @@ public class JDBCEmitter extends AbstractEmitter implements 
Initializable, Close
 
         int i = 0;
         StringBuilder sb = new StringBuilder();
-        for (String val : metadata.getValues(key)) {
+        for (String val : 
metadata.getValues(columnDefinition.getColumnName())) {
             if (StringUtils.isBlank(val)) {
                 continue;
             }
@@ -436,25 +454,16 @@ public class JDBCEmitter extends AbstractEmitter 
implements Initializable, Close
         insertStatement.setDouble(i, d);
     }
 
-    private void updateVarchar(String key, String type, PreparedStatement 
insertStatement, int i,
+    private void updateVarchar(String emitKey, ColumnDefinition 
columnDefinition, PreparedStatement insertStatement,
+                               int i,
                                String val) throws SQLException {
-        if (StringUtils.isBlank(val)) {
-            updateString(insertStatement, i, val);
-            return;
-        }
-        Matcher m = Pattern.compile("varchar\\((\\d+)\\)").matcher(type);
-        if (m.find()) {
-            int len = Integer.parseInt(m.group(1));
-            if (val.length() > len) {
-                int origLength = val.length();
-                val = val.substring(0, len);
-                LOGGER.warn("truncating varchar ({}) from {} to {}", key, 
origLength, len);
-            }
-            updateString(insertStatement, i, val);
+        if (val == null) {
+            insertStatement.setNull(i, Types.VARCHAR);
             return;
         }
-        LOGGER.warn("couldn't parse varchar?! {}", type);
-        updateString(insertStatement, i, null);
+        String normalized = stringNormalizer.normalize(emitKey,
+                columnDefinition.getColumnName(), val, 
columnDefinition.getPrecision());
+        insertStatement.setString(i, normalized);
     }
 
     private void updateTimestamp(PreparedStatement insertStatement, int i, 
String val,
@@ -513,18 +522,11 @@ public class JDBCEmitter extends AbstractEmitter 
implements Initializable, Close
         }
     }
 
-    private void updateString(PreparedStatement insertStatement, int i, String 
val)
-            throws SQLException {
-        if (val == null) {
-            insertStatement.setNull(i, Types.VARCHAR);
-        } else {
-            insertStatement.setString(i, val);
-        }
-    }
 
     @Override
     public void initialize(Map<String, Param> params) throws 
TikaConfigException {
-
+        parseColTypes();
+        setStringNormalizer();
         try {
             createConnection();
         } catch (SQLException e) {
@@ -557,6 +559,21 @@ public class JDBCEmitter extends AbstractEmitter 
implements Initializable, Close
         }
     }
 
+    private void setStringNormalizer() {
+        if (connectionString.startsWith("jdbc:postgres")) {
+            stringNormalizer = new JDBCEmitter.PostgresNormalizer();
+        } else {
+            stringNormalizer = new JDBCEmitter.StringNormalizer();
+        }
+    }
+
+    private void parseColTypes() {
+        columns = new ArrayList<>();
+        for (Map.Entry<String, String> e : keys.entrySet()) {
+            columns.add(ColumnDefinition.parse(e.getKey(), e.getValue(), 
maxStringLength));
+        }
+    }
+
     @Override
     public void checkInitialization(InitializableProblemHandler problemHandler)
             throws TikaConfigException {
@@ -580,4 +597,96 @@ public class JDBCEmitter extends AbstractEmitter 
implements Initializable, Close
         }
     }
 
+    private static String getHandledTypes() {
+        return "'string', 'varchar', " +
+                "'boolean', 'int', 'long', 'float', 'double' and 'timestamp'";
+    }
+
+    private static class StringNormalizer {
+
+        String normalize(String emitKey, String columnName, String s, int 
maxLength) {
+            if (maxLength < 0 || s.length() < maxLength) {
+                return s;
+            }
+            LOGGER.warn("truncating {}->'{}' from {} chars to {} chars",
+                    emitKey, columnName, s.length(), maxLength);
+
+            return s.substring(0, maxLength);
+        }
+    }
+
+    private static class PostgresNormalizer extends StringNormalizer {
+
+        @Override
+        String normalize(String emitKey, String columnName, String s, int 
maxLength) {
+            s = s.replaceAll("\u0000", " ");
+            return super.normalize(emitKey, columnName, s, maxLength);
+        }
+    }
+
+    private static class ColumnDefinition {
+        private static final Matcher VARCHAR_MATCHER =
+                Pattern.compile("varchar\\((\\d+)\\)").matcher("");
+
+        private final String columnName;
+
+        private final int type;
+        //this is only used (so far) for varchar.  It is currently
+        //ignored for other data types
+        private final int precision;
+
+        private static ColumnDefinition parse(String name, String type, int 
maxStringLength) {
+            String lcType = type.toLowerCase(Locale.US);
+            if (VARCHAR_MATCHER.reset(lcType).find()) {
+                return new ColumnDefinition(name,
+                        Types.VARCHAR, 
Integer.parseInt(VARCHAR_MATCHER.group(1)));
+            }
+
+            switch (lcType) {
+
+                case "string":
+                    return new ColumnDefinition(name, Types.VARCHAR, 
maxStringLength);
+                case "bool":
+                case "boolean":
+                    return new ColumnDefinition(name, Types.BOOLEAN, -1);
+                case "int":
+                case "integer":
+                    return new ColumnDefinition(name, Types.INTEGER, -1);
+                case "bigint":
+                case "long":
+                    return new ColumnDefinition(name, Types.BIGINT, -1);
+                case "float":
+                    return new ColumnDefinition(name, Types.FLOAT, -1);
+                case "double":
+                    return new ColumnDefinition(name, Types.DOUBLE, -1);
+                case "timestamp":
+                    return new ColumnDefinition(name, Types.TIMESTAMP, -1);
+
+                default:
+                    throw new IllegalArgumentException(
+                            "Can only process: " + getHandledTypes() +
+                                    " types so far.  Please open a ticket to 
request " +
+                                    type + " for column: " + name);
+            }
+        }
+
+        private ColumnDefinition(String columnName, int type, int precision) {
+            this.columnName = columnName;
+            this.type = type;
+            this.precision = precision;
+        }
+
+        public String getColumnName() {
+            return columnName;
+        }
+
+        public int getType() {
+            return type;
+        }
+
+        public int getPrecision() {
+            return precision;
+        }
+    }
+
 }
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java
 
b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java
index 873c885fd..97b8d2e0f 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java
@@ -17,6 +17,7 @@
 package org.apache.tika.pipes.emitter.jdbc;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.IOException;
@@ -216,6 +217,43 @@ public class JDBCEmitterTest {
         assertEquals(1, rows);
     }
 
+    @Test
+    public void testVarcharTruncation(@TempDir Path tmpDir) throws Exception {
+        Files.createDirectories(tmpDir.resolve("db"));
+        Path dbDir = tmpDir.resolve("db/h2");
+        Path config = tmpDir.resolve("tika-config.xml");
+        String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath();
+
+        writeConfig("/configs/tika-config-jdbc-emitter-trunc.xml",
+                connectionString, config);
+
+        EmitterManager emitterManager = EmitterManager.load(config);
+        Emitter emitter = emitterManager.getEmitter();
+        List<String[]> data = new ArrayList<>();
+        data.add(new String[]{"k1", "abcd"});
+        data.add(new String[]{"k1", "abcdefghijklmnopqrs"});
+        data.add(new String[]{"k1", "abcdefghijk"});
+        int id = 0;
+        for (String[] d : data) {
+            emitter.emit("id" + id++, Collections.singletonList(m(d)));
+        }
+
+        int rows = 0;
+        try (Connection connection = 
DriverManager.getConnection(connectionString)) {
+            try (Statement st = connection.createStatement()) {
+                try (ResultSet rs = st.executeQuery("select * from test")) {
+                    while (rs.next()) {
+                        String s = rs.getString(2);
+                        assertTrue(s.length() < 13);
+                        assertFalse(s.contains("m"));
+                        rows++;
+                    }
+                }
+            }
+        }
+        assertEquals(3, rows);
+    }
+
     private void writeConfig(String srcConfig, String dbDir, Path config) 
throws IOException {
         String xml = IOUtils.resourceToString(srcConfig, 
StandardCharsets.UTF_8);
         xml = xml.replace("CONNECTION_STRING", dbDir);
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter-trunc.xml
 
b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter-trunc.xml
new file mode 100644
index 000000000..85eef281c
--- /dev/null
+++ 
b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter-trunc.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<properties>
+  <emitters>
+    <emitter class="org.apache.tika.pipes.emitter.jdbc.JDBCEmitter">
+      <name>jdbc</name>
+      <connection>CONNECTION_STRING</connection>
+      <createTable>create table test
+        (path varchar(512) primary key,
+        k1 varchar(12));
+      </createTable>
+      <!-- the jdbc emitter always puts ths emitKey value as the first
+           item -->
+      <insert>insert into test (path, k1) values (?,?);
+      </insert>
+      <!-- these are the keys in the metadata object.
+          The emitKey is added as the first element in the insert statement.
+          Then the these values are added in order.
+          They must be in the order of the insert statement.
+          -->
+      <keys>
+        <key k="k1" v="varchar(12)"/>
+      </keys>
+      <attachmentStrategy>first_only</attachmentStrategy>
+    </emitter>
+  </emitters>
+</properties>
\ No newline at end of file

Reply via email to