This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch TIKA-3977 in repository https://gitbox.apache.org/repos/asf/tika.git
commit e8c978252380d4fef533492e650c10b1e41798bd Author: tallison <[email protected]> AuthorDate: Fri Feb 17 11:12:33 2023 -0500 TIKA-3977 -- add postconnection sql call, clean up closing of resources and add max retries to JDBCEmitter --- .../tika/pipes/emitter/jdbc/JDBCEmitter.java | 157 ++++++++++++++------- .../pipes/reporters/jdbc/JDBCPipesReporter.java | 92 +++++++++--- 2 files changed, 174 insertions(+), 75 deletions(-) diff --git a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java index 968cbb4ad..e51351430 100644 --- a/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java +++ b/tika-pipes/tika-emitters/tika-emitter-jdbc/src/main/java/org/apache/tika/pipes/emitter/jdbc/JDBCEmitter.java @@ -28,11 +28,13 @@ import java.sql.Types; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -50,6 +52,7 @@ import org.apache.tika.exception.TikaConfigException; import org.apache.tika.metadata.Metadata; import org.apache.tika.pipes.emitter.AbstractEmitter; import org.apache.tika.pipes.emitter.EmitData; +import org.apache.tika.pipes.emitter.EmitKey; import org.apache.tika.pipes.emitter.TikaEmitterException; import org.apache.tika.utils.StringUtils; @@ -75,11 +78,11 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close FIRST_ONLY, CONCATENATE //anything else? } + //some file formats do not have time zones... //try both - private static final String[] TIKA_DATE_PATTERNS = new String[] { - "yyyy-MM-dd'T'HH:mm:ss'Z'","yyyy-MM-dd'T'HH:mm:ss" - }; + private static final String[] TIKA_DATE_PATTERNS = + new String[]{"yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd'T'HH:mm:ss"}; //the "write" lock is used for creating the table private static ReadWriteLock READ_WRITE_LOCK = new ReentrantReadWriteLock(); //this keeps track of which table + connection string have been created @@ -88,9 +91,14 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close //different jdbc emitters. private static Set<String> TABLES_CREATED = new HashSet<>(); private String connectionString; + + private Optional<String> postConnectionString = Optional.empty(); private String insert; private String createTable; private String alterTable; + + private int maxRetries = 0; + private Map<String, String> keys; private Connection connection; private PreparedStatement insertStatement; @@ -112,6 +120,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close dateFormats[i++] = new SimpleDateFormat(p, Locale.US); } } + /** * This is called immediately after the table is created. * The purpose of this is to allow for adding a complex primary key or @@ -134,8 +143,25 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close } @Field - public void setConnection(String connectionString) { - this.connectionString = connectionString; + public void setConnection(String connection) { + this.connectionString = connection; + } + + public void setMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + } + + /** + * This sql will be called immediately after the connection is made. This was + * initially added for setting pragmas on sqlite3, but may be used for other + * connection configuration in other dbs. Note: This is called before the table is + * created if it needs to be created. + * + * @param postConnection + */ + @Field + public void setPostConnection(String postConnection) { + this.postConnectionString = Optional.of(postConnection); } /** @@ -143,7 +169,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close * a multivalued field in a metadata object, do you want the first value only * or should we concatenate these with the * {@link JDBCEmitter#setMultivaluedFieldDelimiter(String)}. - * + * <p> * The default values as of 2.6.1 are {@link MultivaluedFieldStrategy#CONCATENATE} * and the default delimiter is ", " * @@ -169,7 +195,8 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close /** * See {@link JDBCEmitter#setMultivaluedFieldDelimiter(String)} - * @param delimiter + * + * @param delimiter */ @Field public void setMultivaluedFieldDelimiter(String delimiter) { @@ -209,7 +236,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close * This executes the emit with each call. For more efficient * batch execution use {@link #emit(List)}. * - * @param emitKey emit key + * @param emitKey emit key * @param metadataList list of metadata per file * @throws IOException * @throws TikaEmitterException @@ -220,52 +247,43 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close if (metadataList == null || metadataList.size() < 1) { return; } - try { - if (attachmentStrategy == AttachmentStrategy.FIRST_ONLY) { - insertFirstOnly(emitKey, metadataList); - insertStatement.execute(); - } else { - insertAll(emitKey, metadataList); - insertStatement.executeBatch(); - } - } catch (SQLException e) { - try { - LOGGER.warn("problem during emit; going to try to reconnect", e); - //something went wrong - //try to reconnect - reconnect(); - } catch (SQLException ex) { - throw new TikaEmitterException("Couldn't reconnect!", ex); - } - throw new TikaEmitterException("couldn't emit", e); - } + List<EmitData> emitDataList = new ArrayList<>(); + emitDataList.add(new EmitData(new EmitKey("", emitKey), metadataList)); + emit(emitDataList); } @Override public void emit(List<? extends EmitData> emitData) throws IOException, TikaEmitterException { - try { - if (attachmentStrategy == AttachmentStrategy.FIRST_ONLY) { - for (EmitData d : emitData) { - insertFirstOnly(d.getEmitKey().getEmitKey(), d.getMetadataList()); - insertStatement.addBatch(); - } - } else { - for (EmitData d : emitData) { - insertAll(d.getEmitKey().getEmitKey(), d.getMetadataList()); + int tries = 0; + Exception ex = null; + while (tries++ <= maxRetries) { + try { + emitNow(emitData); + return; + } catch (SQLException e) { + try { + reconnect(); + } catch (SQLException exc) { + throw new TikaEmitterException("couldn't reconnect!", exc); } + ex = e; } - insertStatement.executeBatch(); - } catch (SQLException e) { - try { - LOGGER.warn("problem during emit; going to try to reconnect", e); - //something went wrong - //try to reconnect - reconnect(); - } catch (SQLException ex) { - throw new TikaEmitterException("Couldn't reconnect!", ex); + } + throw new TikaEmitterException("Couldn't emit " + emitData.size() + " records.", ex); + } + + private void emitNow(List<? extends EmitData> emitData) throws SQLException { + if (attachmentStrategy == AttachmentStrategy.FIRST_ONLY) { + for (EmitData d : emitData) { + insertFirstOnly(d.getEmitKey().getEmitKey(), d.getMetadataList()); + insertStatement.addBatch(); + } + } else { + for (EmitData d : emitData) { + insertAll(d.getEmitKey().getEmitKey(), d.getMetadataList()); } - throw new TikaEmitterException("couldn't emit", e); } + insertStatement.executeBatch(); } private void insertAll(String emitKey, List<Metadata> metadataList) throws SQLException { @@ -299,7 +317,8 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close SQLException ex = null; for (int i = 0; i < 3; i++) { try { - connection = DriverManager.getConnection(connectionString); + tryClose(); + createConnection(); insertStatement = connection.prepareStatement(insert); return; } catch (SQLException e) { @@ -310,6 +329,33 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close throw ex; } + private void tryClose() { + if (insertStatement != null) { + try { + insertStatement.close(); + } catch (SQLException e) { + LOGGER.warn("exception closing insert", e); + } + } + + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + LOGGER.warn("exception closing connection", e); + } + } + } + + private void createConnection() throws SQLException { + connection = DriverManager.getConnection(connectionString); + if (postConnectionString.isPresent()) { + try (Statement st = connection.createStatement()) { + st.execute(postConnectionString.get()); + } + } + } + private void updateValue(PreparedStatement insertStatement, int i, String key, String type, int metadataListIndex, List<Metadata> metadataList) throws SQLException { @@ -352,7 +398,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close } private String getVal(Metadata metadata, String key, String type) { - if (! type.equals("string") && ! type.startsWith("varchar")) { + if (!type.equals("string") && !type.startsWith("varchar")) { return metadata.get(key); } if (multivaluedFieldStrategy == MultivaluedFieldStrategy.FIRST_ONLY) { @@ -380,7 +426,8 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close return sb.toString(); } - private void updateDouble(PreparedStatement insertStatement, int i, String val) throws SQLException { + private void updateDouble(PreparedStatement insertStatement, int i, String val) + throws SQLException { if (StringUtils.isBlank(val)) { insertStatement.setNull(i, Types.DOUBLE); return; @@ -390,8 +437,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close } private void updateVarchar(String key, String type, PreparedStatement insertStatement, int i, - String val) - throws SQLException { + String val) throws SQLException { if (StringUtils.isBlank(val)) { updateString(insertStatement, i, val); return; @@ -480,7 +526,7 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close public void initialize(Map<String, Param> params) throws TikaConfigException { try { - connection = DriverManager.getConnection(connectionString); + createConnection(); } catch (SQLException e) { throw new TikaConfigException("couldn't open connection: " + connectionString, e); } @@ -509,8 +555,6 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close } catch (SQLException e) { throw new TikaConfigException("can't create insert statement", e); } - - } @Override @@ -524,6 +568,11 @@ public class JDBCEmitter extends AbstractEmitter implements Initializable, Close */ @Override public void close() throws IOException { + try { + insertStatement.close(); + } catch (SQLException e) { + LOGGER.warn("problem closing insert", e); + } try { connection.close(); } catch (SQLException e) { diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java index 835b50ec1..febb9cc26 100644 --- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java +++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java @@ -25,6 +25,7 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -60,6 +61,8 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl private static final long MAX_WAIT_MILLIS = 120000; private String connectionString; + + private Optional<String> postConnectionString = Optional.empty(); private final ArrayBlockingQueue<KeyStatusPair> queue = new ArrayBlockingQueue(ARRAY_BLOCKING_QUEUE_SIZE); CompletableFuture<Void> reportWorkerFuture; @@ -70,7 +73,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl if (StringUtils.isBlank(connectionString)) { throw new TikaConfigException("Must specify a connectionString"); } - ReportWorker reportWorker = new ReportWorker(connectionString, queue); + ReportWorker reportWorker = new ReportWorker(connectionString, postConnectionString, queue); reportWorker.init(); reportWorkerFuture = CompletableFuture.runAsync(reportWorker); } @@ -87,6 +90,18 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl this.connectionString = connection; } + /** + * This sql will be called immediately after the connection is made. This was + * initially added for setting pragmas on sqlite3, but may be used for other + * connection configuration in other dbs. Note: This is called before the table is + * created if it needs to be created. + * + * @param postConnection + */ + @Field + public void setPostConnection(String postConnection) { + this.postConnectionString = Optional.of(postConnection); + } @Override public void report(FetchEmitTuple t, PipesResult result, long elapsed) { @@ -155,13 +170,17 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl private static final int MAX_TRIES = 3; private final String connectionString; + private final Optional<String> postConnectionString; private final ArrayBlockingQueue<KeyStatusPair> queue; List<KeyStatusPair> cache = new ArrayList<>(); private Connection connection; private PreparedStatement insert; - public ReportWorker(String connectionString, ArrayBlockingQueue<KeyStatusPair> queue) { + public ReportWorker(String connectionString, + Optional<String> postConnectionString, + ArrayBlockingQueue<KeyStatusPair> queue) { this.connectionString = connectionString; + this.postConnectionString = postConnectionString; this.queue = queue; } @@ -186,23 +205,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl return; } if (p == KeyStatusPair.END_SEMAPHORE) { - LOG.trace("received end semaphore"); - try { - reportNow(); - } catch (SQLException e) { - throw new RuntimeException(e); - } catch (InterruptedException e) { - return; - } - LOG.trace("about to close"); - try { - insert.close(); - connection.close(); - LOG.trace("successfully closed resources"); - } catch (SQLException e) { - LOG.warn("problem shutting down reporter", e); - } - + shutdownNow(); return; } cache.add(p); @@ -219,6 +222,29 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl } + private void shutdownNow() { + LOG.trace("received end semaphore"); + try { + reportNow(); + } catch (SQLException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + return; + } + LOG.trace("about to close"); + try { + insert.close(); + } catch (SQLException e) { + LOG.warn("problem shutting down insert statement in reporter", e); + } + try { + connection.close(); + } catch (SQLException e) { + LOG.warn("problem shutting down connection in reporter", e); + } + LOG.trace("successfully closed resources"); + } + private void reportNow() throws SQLException, InterruptedException { int attempt = 0; while (++attempt < MAX_TRIES) { @@ -254,22 +280,46 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl SQLException ex = null; while (++attempts < 3) { try { + tryClose(); createConnection(); createPreparedStatement(); + LOG.debug("success reconnecting after {} attempts", attempts); return; } catch (SQLException e) { LOG.warn("problem reconnecting", e); - //if there's a failure, wait 10 seconds + //if there's a failure, wait 30 seconds //and hope the db is back up. - Thread.sleep(10000); + Thread.sleep(30000); ex = e; } } throw ex; } + private void tryClose() { + if (insert != null) { + try { + insert.close(); + } catch (SQLException e) { + LOG.warn("exception closing insert statement", insert); + } + } + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + LOG.warn("exception closing connection", e); + } + } + } + private void createConnection() throws SQLException { connection = DriverManager.getConnection(connectionString); + if (postConnectionString.isPresent()) { + try (Statement st = connection.createStatement()) { + st.execute(postConnectionString.get()); + } + } } private void createPreparedStatement() throws SQLException {
