This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push: new 87ace2f08 TIKA-3986 (#1012) 87ace2f08 is described below commit 87ace2f0894b929b77ec77477daf180cb73b8307 Author: Tim Allison <talli...@apache.org> AuthorDate: Tue Mar 14 12:01:16 2023 -0400 TIKA-3986 (#1012) * TIKA-3986 -- strip \u0000 for postgres varchar and make column type handling more efficient --- .../tika/pipes/emitter/jdbc/JDBCEmitter.java | 225 +++++++++++++++------ .../tika/pipes/emitter/jdbc/JDBCEmitterTest.java | 38 ++++ .../configs/tika-config-jdbc-emitter-trunc.xml | 44 ++++ 3 files changed, 249 insertions(+), 58 deletions(-) diff --git a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java index e51351430..e72d6ba19 100644 --- a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java +++ b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java @@ -82,7 +82,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close //some file formats do not have time zones... //try both private static final String[] TIKA_DATE_PATTERNS = - new String[]{"yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd'T'HH:mm:ss"}; + new String[] {"yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd'T'HH:mm:ss"}; //the "write" lock is used for creating the table private static ReadWriteLock READ_WRITE_LOCK = new ReentrantReadWriteLock(); //this keeps track of which table + connection string have been created @@ -99,7 +99,11 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close private int maxRetries = 0; + //used only for specification of column name/string definition of + //keys private Map<String, String> keys; + + private List<ColumnDefinition> columns; private Connection connection; private PreparedStatement insertStatement; private AttachmentStrategy attachmentStrategy = AttachmentStrategy.FIRST_ONLY; @@ -113,6 +117,11 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close //multithreaded, this will be a big problem. private final DateFormat[] dateFormats; + private int maxStringLength = 64000; + + //this is set during the initialize phase + private StringNormalizer stringNormalizer; + public JDBCEmitter() { dateFormats = new DateFormat[TIKA_DATE_PATTERNS.length]; int i = 0; @@ -147,6 +156,18 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close this.connectionString = connection; } + /** + * Set the maximum string length in characters (not bytes). + * This is applies only to fields with name "string" + * not to "varchar". + * + * @param maxStringLength + */ + @Field + public void setMaxStringLength(int maxStringLength) { + this.maxStringLength = maxStringLength; + } + public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; } @@ -293,8 +314,8 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close int col = 0; insertStatement.setString(++col, emitKey); insertStatement.setInt(++col, i); - for (Map.Entry<String, String> e : keys.entrySet()) { - updateValue(insertStatement, ++col, e.getKey(), e.getValue(), i, metadataList); + for (ColumnDefinition columnDefinition : columns) { + updateValue(emitKey, insertStatement, ++col, columnDefinition, i, metadataList); } insertStatement.addBatch(); } @@ -308,8 +329,8 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close dateFormats[i] = new SimpleDateFormat(TIKA_DATE_PATTERNS[j], Locale.US); } insertStatement.setString(++i, emitKey); - for (Map.Entry<String, String> e : keys.entrySet()) { - updateValue(insertStatement, ++i, e.getKey(), e.getValue(), 0, metadataList); + for (ColumnDefinition columnDefinition : columns) { + updateValue(emitKey, insertStatement, ++i, columnDefinition, 0, metadataList); } } @@ -356,55 +377,52 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close } } - private void updateValue(PreparedStatement insertStatement, int i, String key, String type, - int metadataListIndex, List<Metadata> metadataList) + private void updateValue(String emitKey, PreparedStatement insertStatement, int i, + ColumnDefinition columnDefinition, int metadataListIndex, + List<Metadata> metadataList) throws SQLException { Metadata metadata = metadataList.get(metadataListIndex); - String val = getVal(metadata, key, type); - String lcType = type.toLowerCase(Locale.US); - if (lcType.startsWith("varchar")) { - updateVarchar(key, lcType, insertStatement, i, val); - return; - } - switch (lcType) { - case "string": - updateString(insertStatement, i, val); + String val = getVal(metadata, columnDefinition); + switch (columnDefinition.getType()) { + case Types.VARCHAR: + updateVarchar(emitKey, columnDefinition, insertStatement, i, val); break; - case "bool": - case "boolean": + case Types.BOOLEAN: updateBoolean(insertStatement, i, val); break; - case "int": - case "integer": + case Types.INTEGER: updateInteger(insertStatement, i, val); break; - case "bigint": - case "long": + case Types.BIGINT: updateLong(insertStatement, i, val); break; - case "float": + case Types.FLOAT: updateFloat(insertStatement, i, val); break; - case "double": + case Types.DOUBLE: updateDouble(insertStatement, i, val); break; - case "timestamp": + case Types.TIMESTAMP: updateTimestamp(insertStatement, i, val, dateFormats); break; default: - throw new IllegalArgumentException("Can only process: 'string', 'boolean', 'int' " + - "and 'long' types so far. Please open a ticket to request: " + type); + throw new IllegalArgumentException( + "Can only process:" + getHandledTypes() + + " types so far. " + + "Please open a ticket to request: " + + columnDefinition.getType() + " for " + + columnDefinition.getColumnName()); } } - private String getVal(Metadata metadata, String key, String type) { - if (!type.equals("string") && !type.startsWith("varchar")) { - return metadata.get(key); + private String getVal(Metadata metadata, ColumnDefinition columnDefinition) { + if (columnDefinition.getType() != Types.VARCHAR) { + return metadata.get(columnDefinition.getColumnName()); } if (multivaluedFieldStrategy == MultivaluedFieldStrategy.FIRST_ONLY) { - return metadata.get(key); + return metadata.get(columnDefinition.getColumnName()); } - String[] vals = metadata.getValues(key); + String[] vals = metadata.getValues(columnDefinition.getColumnName()); if (vals.length == 0) { return null; } else if (vals.length == 1) { @@ -413,7 +431,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close int i = 0; StringBuilder sb = new StringBuilder(); - for (String val : metadata.getValues(key)) { + for (String val : metadata.getValues(columnDefinition.getColumnName())) { if (StringUtils.isBlank(val)) { continue; } @@ -436,25 +454,16 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close insertStatement.setDouble(i, d); } - private void updateVarchar(String key, String type, PreparedStatement insertStatement, int i, + private void updateVarchar(String emitKey, ColumnDefinition columnDefinition, PreparedStatement insertStatement, + int i, String val) throws SQLException { - if (StringUtils.isBlank(val)) { - updateString(insertStatement, i, val); - return; - } - Matcher m = Pattern.compile("varchar\\((\\d+)\\)").matcher(type); - if (m.find()) { - int len = Integer.parseInt(m.group(1)); - if (val.length() > len) { - int origLength = val.length(); - val = val.substring(0, len); - LOGGER.warn("truncating varchar ({}) from {} to {}", key, origLength, len); - } - updateString(insertStatement, i, val); + if (val == null) { + insertStatement.setNull(i, Types.VARCHAR); return; } - LOGGER.warn("couldn't parse varchar?! {}", type); - updateString(insertStatement, i, null); + String normalized = stringNormalizer.normalize(emitKey, + columnDefinition.getColumnName(), val, columnDefinition.getPrecision()); + insertStatement.setString(i, normalized); } private void updateTimestamp(PreparedStatement insertStatement, int i, String val, @@ -513,18 +522,11 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close } } - private void updateString(PreparedStatement insertStatement, int i, String val) - throws SQLException { - if (val == null) { - insertStatement.setNull(i, Types.VARCHAR); - } else { - insertStatement.setString(i, val); - } - } @Override public void initialize(Map<String, Param> params) throws TikaConfigException { - + parseColTypes(); + setStringNormalizer(); try { createConnection(); } catch (SQLException e) { @@ -557,6 +559,21 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close } } + private void setStringNormalizer() { + if (connectionString.startsWith("jdbc:postgres")) { + stringNormalizer = new JDBCEmitter.PostgresNormalizer(); + } else { + stringNormalizer = new JDBCEmitter.StringNormalizer(); + } + } + + private void parseColTypes() { + columns = new ArrayList<>(); + for (Map.Entry<String, String> e : keys.entrySet()) { + columns.add(ColumnDefinition.parse(e.getKey(), e.getValue(), maxStringLength)); + } + } + @Override public void checkInitialization(InitializableProblemHandler problemHandler) throws TikaConfigException { @@ -580,4 +597,96 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close } } + private static String getHandledTypes() { + return "'string', 'varchar', " + + "'boolean', 'int', 'long', 'float', 'double' and 'timestamp'"; + } + + private static class StringNormalizer { + + String normalize(String emitKey, String columnName, String s, int maxLength) { + if (maxLength < 0 || s.length() < maxLength) { + return s; + } + LOGGER.warn("truncating {}->'{}' from {} chars to {} chars", + emitKey, columnName, s.length(), maxLength); + + return s.substring(0, maxLength); + } + } + + private static class PostgresNormalizer extends StringNormalizer { + + @Override + String normalize(String emitKey, String columnName, String s, int maxLength) { + s = s.replaceAll("\u0000", " "); + return super.normalize(emitKey, columnName, s, maxLength); + } + } + + private static class ColumnDefinition { + private static final Matcher VARCHAR_MATCHER = + Pattern.compile("varchar\\((\\d+)\\)").matcher(""); + + private final String columnName; + + private final int type; + //this is only used (so far) for varchar. It is currently + //ignored for other data types + private final int precision; + + private static ColumnDefinition parse(String name, String type, int maxStringLength) { + String lcType = type.toLowerCase(Locale.US); + if (VARCHAR_MATCHER.reset(lcType).find()) { + return new ColumnDefinition(name, + Types.VARCHAR, Integer.parseInt(VARCHAR_MATCHER.group(1))); + } + + switch (lcType) { + + case "string": + return new ColumnDefinition(name, Types.VARCHAR, maxStringLength); + case "bool": + case "boolean": + return new ColumnDefinition(name, Types.BOOLEAN, -1); + case "int": + case "integer": + return new ColumnDefinition(name, Types.INTEGER, -1); + case "bigint": + case "long": + return new ColumnDefinition(name, Types.BIGINT, -1); + case "float": + return new ColumnDefinition(name, Types.FLOAT, -1); + case "double": + return new ColumnDefinition(name, Types.DOUBLE, -1); + case "timestamp": + return new ColumnDefinition(name, Types.TIMESTAMP, -1); + + default: + throw new IllegalArgumentException( + "Can only process: " + getHandledTypes() + + " types so far. Please open a ticket to request " + + type + " for column: " + name); + } + } + + private ColumnDefinition(String columnName, int type, int precision) { + this.columnName = columnName; + this.type = type; + this.precision = precision; + } + + public String getColumnName() { + return columnName; + } + + public int getType() { + return type; + } + + public int getPrecision() { + return precision; + } + } + } diff --git a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java index 873c885fd..97b8d2e0f 100644 --- a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java +++ b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java @@ -17,6 +17,7 @@ package org.apache.tika.pipes.emitter.jdbc; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; @@ -216,6 +217,43 @@ public class JDBCEmitterTest { assertEquals(1, rows); } + @Test + public void testVarcharTruncation(@TempDir Path tmpDir) throws Exception { + Files.createDirectories(tmpDir.resolve("db")); + Path dbDir = tmpDir.resolve("db/h2"); + Path config = tmpDir.resolve("tika-config.xml"); + String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath(); + + writeConfig("/configs/tika-config-jdbc-emitter-trunc.xml", + connectionString, config); + + EmitterManager emitterManager = EmitterManager.load(config); + Emitter emitter = emitterManager.getEmitter(); + List<String[]> data = new ArrayList<>(); + data.add(new String[]{"k1", "abcd"}); + data.add(new String[]{"k1", "abcdefghijklmnopqrs"}); + data.add(new String[]{"k1", "abcdefghijk"}); + int id = 0; + for (String[] d : data) { + emitter.emit("id" + id++, Collections.singletonList(m(d))); + } + + int rows = 0; + try (Connection connection = DriverManager.getConnection(connectionString)) { + try (Statement st = connection.createStatement()) { + try (ResultSet rs = st.executeQuery("select * from test")) { + while (rs.next()) { + String s = rs.getString(2); + assertTrue(s.length() < 13); + assertFalse(s.contains("m")); + rows++; + } + } + } + } + assertEquals(3, rows); + } + private void writeConfig(String srcConfig, String dbDir, Path config) throws IOException { String xml = IOUtils.resourceToString(srcConfig, StandardCharsets.UTF_8); xml = xml.replace("CONNECTION_STRING", dbDir); diff --git a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter-trunc.xml b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter-trunc.xml new file mode 100644 index 000000000..85eef281c --- /dev/null +++ b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter-trunc.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<properties> + <emitters> + <emitter class="org.apache.tika.pipes.emitter.jdbc.JDBCEmitter"> + <name>jdbc</name> + <connection>CONNECTION_STRING</connection> + <createTable>create table test + (path varchar(512) primary key, + k1 varchar(12)); + </createTable> + <!-- the jdbc emitter always puts ths emitKey value as the first + item --> + <insert>insert into test (path, k1) values (?,?); + </insert> + <!-- these are the keys in the metadata object. + The emitKey is added as the first element in the insert statement. + Then the these values are added in order. + They must be in the order of the insert statement. + --> + <keys> + <key k="k1" v="varchar(12)"/> + </keys> + <attachmentStrategy>first_only</attachmentStrategy> + </emitter> + </emitters> +</properties> \ No newline at end of file