This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 87ace2f08 TIKA-3986 (#1012)
87ace2f08 is described below
commit 87ace2f0894b929b77ec77477daf180cb73b8307
Author: Tim Allison <[email protected]>
AuthorDate: Tue Mar 14 12:01:16 2023 -0400
TIKA-3986 (#1012)
* TIKA-3986 -- strip \u0000 for postgres varchar and make column type
handling more efficient
---
.../tika/pipes/emitter/jdbc/JDBCEmitter.java | 225 +++++++++++++++------
.../tika/pipes/emitter/jdbc/JDBCEmitterTest.java | 38 ++++
.../configs/tika-config-jdbc-emitter-trunc.xml | 44 ++++
3 files changed, 249 insertions(+), 58 deletions(-)
diff --git
a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
index e51351430..e72d6ba19 100644
---
a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
+++
b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java
@@ -82,7 +82,7 @@ public class JDBCEmitter extends AbstractEmitter implements
Initializable, Close
//some file formats do not have time zones...
//try both
private static final String[] TIKA_DATE_PATTERNS =
- new String[]{"yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd'T'HH:mm:ss"};
+ new String[] {"yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd'T'HH:mm:ss"};
//the "write" lock is used for creating the table
private static ReadWriteLock READ_WRITE_LOCK = new
ReentrantReadWriteLock();
//this keeps track of which table + connection string have been created
@@ -99,7 +99,11 @@ public class JDBCEmitter extends AbstractEmitter implements
Initializable, Close
private int maxRetries = 0;
+ //used only for specification of column name/string definition of
+ //keys
private Map<String, String> keys;
+
+ private List<ColumnDefinition> columns;
private Connection connection;
private PreparedStatement insertStatement;
private AttachmentStrategy attachmentStrategy =
AttachmentStrategy.FIRST_ONLY;
@@ -113,6 +117,11 @@ public class JDBCEmitter extends AbstractEmitter
implements Initializable, Close
//multithreaded, this will be a big problem.
private final DateFormat[] dateFormats;
+ private int maxStringLength = 64000;
+
+ //this is set during the initialize phase
+ private StringNormalizer stringNormalizer;
+
public JDBCEmitter() {
dateFormats = new DateFormat[TIKA_DATE_PATTERNS.length];
int i = 0;
@@ -147,6 +156,18 @@ public class JDBCEmitter extends AbstractEmitter
implements Initializable, Close
this.connectionString = connection;
}
+ /**
+ * Set the maximum string length in characters (not bytes).
+ * This is applies only to fields with name "string"
+ * not to "varchar".
+ *
+ * @param maxStringLength
+ */
+ @Field
+ public void setMaxStringLength(int maxStringLength) {
+ this.maxStringLength = maxStringLength;
+ }
+
public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}
@@ -293,8 +314,8 @@ public class JDBCEmitter extends AbstractEmitter implements
Initializable, Close
int col = 0;
insertStatement.setString(++col, emitKey);
insertStatement.setInt(++col, i);
- for (Map.Entry<String, String> e : keys.entrySet()) {
- updateValue(insertStatement, ++col, e.getKey(), e.getValue(),
i, metadataList);
+ for (ColumnDefinition columnDefinition : columns) {
+ updateValue(emitKey, insertStatement, ++col, columnDefinition,
i, metadataList);
}
insertStatement.addBatch();
}
@@ -308,8 +329,8 @@ public class JDBCEmitter extends AbstractEmitter implements
Initializable, Close
dateFormats[i] = new SimpleDateFormat(TIKA_DATE_PATTERNS[j],
Locale.US);
}
insertStatement.setString(++i, emitKey);
- for (Map.Entry<String, String> e : keys.entrySet()) {
- updateValue(insertStatement, ++i, e.getKey(), e.getValue(), 0,
metadataList);
+ for (ColumnDefinition columnDefinition : columns) {
+ updateValue(emitKey, insertStatement, ++i, columnDefinition, 0,
metadataList);
}
}
@@ -356,55 +377,52 @@ public class JDBCEmitter extends AbstractEmitter
implements Initializable, Close
}
}
- private void updateValue(PreparedStatement insertStatement, int i, String
key, String type,
- int metadataListIndex, List<Metadata>
metadataList)
+ private void updateValue(String emitKey, PreparedStatement
insertStatement, int i,
+ ColumnDefinition columnDefinition, int
metadataListIndex,
+ List<Metadata> metadataList)
throws SQLException {
Metadata metadata = metadataList.get(metadataListIndex);
- String val = getVal(metadata, key, type);
- String lcType = type.toLowerCase(Locale.US);
- if (lcType.startsWith("varchar")) {
- updateVarchar(key, lcType, insertStatement, i, val);
- return;
- }
- switch (lcType) {
- case "string":
- updateString(insertStatement, i, val);
+ String val = getVal(metadata, columnDefinition);
+ switch (columnDefinition.getType()) {
+ case Types.VARCHAR:
+ updateVarchar(emitKey, columnDefinition, insertStatement, i,
val);
break;
- case "bool":
- case "boolean":
+ case Types.BOOLEAN:
updateBoolean(insertStatement, i, val);
break;
- case "int":
- case "integer":
+ case Types.INTEGER:
updateInteger(insertStatement, i, val);
break;
- case "bigint":
- case "long":
+ case Types.BIGINT:
updateLong(insertStatement, i, val);
break;
- case "float":
+ case Types.FLOAT:
updateFloat(insertStatement, i, val);
break;
- case "double":
+ case Types.DOUBLE:
updateDouble(insertStatement, i, val);
break;
- case "timestamp":
+ case Types.TIMESTAMP:
updateTimestamp(insertStatement, i, val, dateFormats);
break;
default:
- throw new IllegalArgumentException("Can only process:
'string', 'boolean', 'int' " +
- "and 'long' types so far. Please open a ticket to
request: " + type);
+ throw new IllegalArgumentException(
+ "Can only process:" + getHandledTypes() +
+ " types so far. " +
+ "Please open a ticket to request: " +
+ columnDefinition.getType() + " for " +
+ columnDefinition.getColumnName());
}
}
- private String getVal(Metadata metadata, String key, String type) {
- if (!type.equals("string") && !type.startsWith("varchar")) {
- return metadata.get(key);
+ private String getVal(Metadata metadata, ColumnDefinition
columnDefinition) {
+ if (columnDefinition.getType() != Types.VARCHAR) {
+ return metadata.get(columnDefinition.getColumnName());
}
if (multivaluedFieldStrategy == MultivaluedFieldStrategy.FIRST_ONLY) {
- return metadata.get(key);
+ return metadata.get(columnDefinition.getColumnName());
}
- String[] vals = metadata.getValues(key);
+ String[] vals = metadata.getValues(columnDefinition.getColumnName());
if (vals.length == 0) {
return null;
} else if (vals.length == 1) {
@@ -413,7 +431,7 @@ public class JDBCEmitter extends AbstractEmitter implements
Initializable, Close
int i = 0;
StringBuilder sb = new StringBuilder();
- for (String val : metadata.getValues(key)) {
+ for (String val :
metadata.getValues(columnDefinition.getColumnName())) {
if (StringUtils.isBlank(val)) {
continue;
}
@@ -436,25 +454,16 @@ public class JDBCEmitter extends AbstractEmitter
implements Initializable, Close
insertStatement.setDouble(i, d);
}
- private void updateVarchar(String key, String type, PreparedStatement
insertStatement, int i,
+ private void updateVarchar(String emitKey, ColumnDefinition
columnDefinition, PreparedStatement insertStatement,
+ int i,
String val) throws SQLException {
- if (StringUtils.isBlank(val)) {
- updateString(insertStatement, i, val);
- return;
- }
- Matcher m = Pattern.compile("varchar\\((\\d+)\\)").matcher(type);
- if (m.find()) {
- int len = Integer.parseInt(m.group(1));
- if (val.length() > len) {
- int origLength = val.length();
- val = val.substring(0, len);
- LOGGER.warn("truncating varchar ({}) from {} to {}", key,
origLength, len);
- }
- updateString(insertStatement, i, val);
+ if (val == null) {
+ insertStatement.setNull(i, Types.VARCHAR);
return;
}
- LOGGER.warn("couldn't parse varchar?! {}", type);
- updateString(insertStatement, i, null);
+ String normalized = stringNormalizer.normalize(emitKey,
+ columnDefinition.getColumnName(), val,
columnDefinition.getPrecision());
+ insertStatement.setString(i, normalized);
}
private void updateTimestamp(PreparedStatement insertStatement, int i,
String val,
@@ -513,18 +522,11 @@ public class JDBCEmitter extends AbstractEmitter
implements Initializable, Close
}
}
- private void updateString(PreparedStatement insertStatement, int i, String
val)
- throws SQLException {
- if (val == null) {
- insertStatement.setNull(i, Types.VARCHAR);
- } else {
- insertStatement.setString(i, val);
- }
- }
@Override
public void initialize(Map<String, Param> params) throws
TikaConfigException {
-
+ parseColTypes();
+ setStringNormalizer();
try {
createConnection();
} catch (SQLException e) {
@@ -557,6 +559,21 @@ public class JDBCEmitter extends AbstractEmitter
implements Initializable, Close
}
}
+ private void setStringNormalizer() {
+ if (connectionString.startsWith("jdbc:postgres")) {
+ stringNormalizer = new JDBCEmitter.PostgresNormalizer();
+ } else {
+ stringNormalizer = new JDBCEmitter.StringNormalizer();
+ }
+ }
+
+ private void parseColTypes() {
+ columns = new ArrayList<>();
+ for (Map.Entry<String, String> e : keys.entrySet()) {
+ columns.add(ColumnDefinition.parse(e.getKey(), e.getValue(),
maxStringLength));
+ }
+ }
+
@Override
public void checkInitialization(InitializableProblemHandler problemHandler)
throws TikaConfigException {
@@ -580,4 +597,96 @@ public class JDBCEmitter extends AbstractEmitter
implements Initializable, Close
}
}
+ private static String getHandledTypes() {
+ return "'string', 'varchar', " +
+ "'boolean', 'int', 'long', 'float', 'double' and 'timestamp'";
+ }
+
+ private static class StringNormalizer {
+
+ String normalize(String emitKey, String columnName, String s, int
maxLength) {
+ if (maxLength < 0 || s.length() < maxLength) {
+ return s;
+ }
+ LOGGER.warn("truncating {}->'{}' from {} chars to {} chars",
+ emitKey, columnName, s.length(), maxLength);
+
+ return s.substring(0, maxLength);
+ }
+ }
+
+ private static class PostgresNormalizer extends StringNormalizer {
+
+ @Override
+ String normalize(String emitKey, String columnName, String s, int
maxLength) {
+ s = s.replaceAll("\u0000", " ");
+ return super.normalize(emitKey, columnName, s, maxLength);
+ }
+ }
+
+ private static class ColumnDefinition {
+ private static final Matcher VARCHAR_MATCHER =
+ Pattern.compile("varchar\\((\\d+)\\)").matcher("");
+
+ private final String columnName;
+
+ private final int type;
+ //this is only used (so far) for varchar. It is currently
+ //ignored for other data types
+ private final int precision;
+
+ private static ColumnDefinition parse(String name, String type, int
maxStringLength) {
+ String lcType = type.toLowerCase(Locale.US);
+ if (VARCHAR_MATCHER.reset(lcType).find()) {
+ return new ColumnDefinition(name,
+ Types.VARCHAR,
Integer.parseInt(VARCHAR_MATCHER.group(1)));
+ }
+
+ switch (lcType) {
+
+ case "string":
+ return new ColumnDefinition(name, Types.VARCHAR,
maxStringLength);
+ case "bool":
+ case "boolean":
+ return new ColumnDefinition(name, Types.BOOLEAN, -1);
+ case "int":
+ case "integer":
+ return new ColumnDefinition(name, Types.INTEGER, -1);
+ case "bigint":
+ case "long":
+ return new ColumnDefinition(name, Types.BIGINT, -1);
+ case "float":
+ return new ColumnDefinition(name, Types.FLOAT, -1);
+ case "double":
+ return new ColumnDefinition(name, Types.DOUBLE, -1);
+ case "timestamp":
+ return new ColumnDefinition(name, Types.TIMESTAMP, -1);
+
+ default:
+ throw new IllegalArgumentException(
+ "Can only process: " + getHandledTypes() +
+ " types so far. Please open a ticket to
request " +
+ type + " for column: " + name);
+ }
+ }
+
+ private ColumnDefinition(String columnName, int type, int precision) {
+ this.columnName = columnName;
+ this.type = type;
+ this.precision = precision;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public int getType() {
+ return type;
+ }
+
+ public int getPrecision() {
+ return precision;
+ }
+ }
+
}
diff --git
a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java
b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java
index 873c885fd..97b8d2e0f 100644
---
a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java
+++
b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitterTest.java
@@ -17,6 +17,7 @@
package org.apache.tika.pipes.emitter.jdbc;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
@@ -216,6 +217,43 @@ public class JDBCEmitterTest {
assertEquals(1, rows);
}
+ @Test
+ public void testVarcharTruncation(@TempDir Path tmpDir) throws Exception {
+ Files.createDirectories(tmpDir.resolve("db"));
+ Path dbDir = tmpDir.resolve("db/h2");
+ Path config = tmpDir.resolve("tika-config.xml");
+ String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath();
+
+ writeConfig("/configs/tika-config-jdbc-emitter-trunc.xml",
+ connectionString, config);
+
+ EmitterManager emitterManager = EmitterManager.load(config);
+ Emitter emitter = emitterManager.getEmitter();
+ List<String[]> data = new ArrayList<>();
+ data.add(new String[]{"k1", "abcd"});
+ data.add(new String[]{"k1", "abcdefghijklmnopqrs"});
+ data.add(new String[]{"k1", "abcdefghijk"});
+ int id = 0;
+ for (String[] d : data) {
+ emitter.emit("id" + id++, Collections.singletonList(m(d)));
+ }
+
+ int rows = 0;
+ try (Connection connection =
DriverManager.getConnection(connectionString)) {
+ try (Statement st = connection.createStatement()) {
+ try (ResultSet rs = st.executeQuery("select * from test")) {
+ while (rs.next()) {
+ String s = rs.getString(2);
+ assertTrue(s.length() < 13);
+ assertFalse(s.contains("m"));
+ rows++;
+ }
+ }
+ }
+ }
+ assertEquals(3, rows);
+ }
+
private void writeConfig(String srcConfig, String dbDir, Path config)
throws IOException {
String xml = IOUtils.resourceToString(srcConfig,
StandardCharsets.UTF_8);
xml = xml.replace("CONNECTION_STRING", dbDir);
diff --git
a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter-trunc.xml
b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter-trunc.xml
new file mode 100644
index 000000000..85eef281c
--- /dev/null
+++
b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/test/resources/configs/tika-config-jdbc-emitter-trunc.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<properties>
+ <emitters>
+ <emitter class="org.apache.tika.pipes.emitter.jdbc.JDBCEmitter">
+ <name>jdbc</name>
+ <connection>CONNECTION_STRING</connection>
+ <createTable>create table test
+ (path varchar(512) primary key,
+ k1 varchar(12));
+ </createTable>
+ <!-- the jdbc emitter always puts ths emitKey value as the first
+ item -->
+ <insert>insert into test (path, k1) values (?,?);
+ </insert>
+ <!-- these are the keys in the metadata object.
+ The emitKey is added as the first element in the insert statement.
+ Then the these values are added in order.
+ They must be in the order of the insert statement.
+ -->
+ <keys>
+ <key k="k1" v="varchar(12)"/>
+ </keys>
+ <attachmentStrategy>first_only</attachmentStrategy>
+ </emitter>
+ </emitters>
+</properties>
\ No newline at end of file