This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tika.git
commit 4b114a31aee19ce921be4151bf75e4212596754b Author: tballison <talli...@apache.org> AuthorDate: Thu Nov 17 16:36:27 2022 -0500 Improve closing resources in JDBCPipesReporter, update unit tests and rename columns to align with jdbc emitter --- .../pipes/reporters/jdbc/JDBCPipesReporter.java | 19 +++++++-- .../reporters/jdbc/TestJDBCPipesReporter.java | 47 +++++++++++++++++----- .../{ => configs}/tika-config-excludes.xml | 2 +- .../{ => configs}/tika-config-includes.xml | 2 +- 4 files changed, 53 insertions(+), 17 deletions(-) diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java index f29c2f986..835b50ec1 100644 --- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java +++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java @@ -60,8 +60,8 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl private static final long MAX_WAIT_MILLIS = 120000; private String connectionString; - private ArrayBlockingQueue<KeyStatusPair> queue = - new ArrayBlockingQueue<>(ARRAY_BLOCKING_QUEUE_SIZE); + private final ArrayBlockingQueue<KeyStatusPair> queue = + new ArrayBlockingQueue(ARRAY_BLOCKING_QUEUE_SIZE); CompletableFuture<Void> reportWorkerFuture; @Override @@ -186,6 +186,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl return; } if (p == KeyStatusPair.END_SEMAPHORE) { + LOG.trace("received end semaphore"); try { reportNow(); } catch (SQLException e) { @@ -193,6 +194,15 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl } catch (InterruptedException e) { return; } + LOG.trace("about to close"); + try { + insert.close(); + connection.close(); + LOG.trace("successfully closed resources"); + } catch (SQLException e) { + LOG.warn("problem shutting down reporter", e); + } + return; } cache.add(p); @@ -220,6 +230,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl insert.addBatch(); } insert.executeBatch(); + LOG.debug("writing {} " + cache.size()); cache.clear(); return; } catch (SQLException e) { @@ -233,7 +244,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl try (Statement st = connection.createStatement()) { String sql = "drop table if exists " + TABLE_NAME; st.execute(sql); - sql = "create table " + TABLE_NAME + " (emit_key varchar(512), status varchar(32))"; + sql = "create table " + TABLE_NAME + " (path varchar(1024), status varchar(32))"; st.execute(sql); } } @@ -262,7 +273,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl } private void createPreparedStatement() throws SQLException { - String sql = "insert into " + TABLE_NAME + " (emit_key, status) values (?,?)"; + String sql = "insert into " + TABLE_NAME + " (path, status) values (?,?)"; insert = connection.prepareStatement(sql); } } diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java index 188e262a9..75f9fe36e 100644 --- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java +++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java @@ -22,8 +22,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; @@ -42,7 +44,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import org.apache.tika.pipes.FetchEmitTuple; import org.apache.tika.pipes.PipesReporter; @@ -55,16 +59,20 @@ import org.apache.tika.pipes.pipesiterator.TotalCountResult; public class TestJDBCPipesReporter { @Test - public void testBasic() throws Exception { + public void testBasic(@TempDir Path tmpDir) throws Exception { + Files.createDirectories(tmpDir.resolve("db")); + Path dbDir = tmpDir.resolve("db/h2"); + String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath(); + int numThreads = 10; int numIterations = 200; - String connectionString = "jdbc:h2:mem:test_tika"; JDBCPipesReporter reporter = new JDBCPipesReporter(); reporter.setConnection(connectionString); reporter.initialize(new HashMap<>()); Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations); reporter.close(); + Map<PipesResult.STATUS, Long> total = countReported(connectionString); assertEquals(expected.size(), total.size()); long sum = 0; @@ -77,13 +85,19 @@ public class TestJDBCPipesReporter { } @Test - public void testIncludes() throws Exception { - Path p = Paths.get(this.getClass().getResource("/tika-config-includes.xml").toURI()); - AsyncConfig asyncConfig = AsyncConfig.load(p); + public void testIncludes(@TempDir Path tmpDir) throws Exception { + Files.createDirectories(tmpDir.resolve("db")); + Path dbDir = tmpDir.resolve("db/h2"); + Path config = tmpDir.resolve("tika-config.xml"); + String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath(); + + writeConfig("/configs/tika-config-includes.xml", + connectionString, config); + + AsyncConfig asyncConfig = AsyncConfig.load(config); PipesReporter reporter = asyncConfig.getPipesReporter(); int numThreads = 10; int numIterations = 200; - String connectionString = "jdbc:h2:mem:test_tika"; Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations); reporter.close(); @@ -103,13 +117,18 @@ public class TestJDBCPipesReporter { } @Test - public void testExcludes() throws Exception { - Path p = Paths.get(this.getClass().getResource("/tika-config-excludes.xml").toURI()); - AsyncConfig asyncConfig = AsyncConfig.load(p); + public void testExcludes(@TempDir Path tmpDir) throws Exception { + Files.createDirectories(tmpDir.resolve("db")); + Path dbDir = tmpDir.resolve("db/h2"); + Path config = tmpDir.resolve("tika-config.xml"); + String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath(); + + writeConfig("/configs/tika-config-excludes.xml", + connectionString, config); + AsyncConfig asyncConfig = AsyncConfig.load(config); PipesReporter reporter = asyncConfig.getPipesReporter(); int numThreads = 10; int numIterations = 200; - String connectionString = "jdbc:h2:mem:test_tika"; Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations); reporter.close(); @@ -235,4 +254,10 @@ public class TestJDBCPipesReporter { return written; } } + + private void writeConfig(String srcConfig, String dbDir, Path config) throws IOException { + String xml = IOUtils.resourceToString(srcConfig, StandardCharsets.UTF_8); + xml = xml.replace("CONNECTION_STRING", dbDir); + Files.write(config, xml.getBytes(StandardCharsets.UTF_8)); + } } diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-excludes.xml similarity index 96% rename from tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml rename to tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-excludes.xml index f35b27952..ab7682237 100644 --- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml +++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-excludes.xml @@ -35,7 +35,7 @@ </params> <pipesReporter class="org.apache.tika.pipes.reporters.jdbc.JDBCPipesReporter"> <params> - <connection>jdbc:h2:mem:test_tika</connection> + <connection>CONNECTION_STRING</connection> <excludes> <exclude>PARSE_SUCCESS</exclude> <exclude>PARSE_SUCCESS_WITH_EXCEPTION</exclude> diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-includes.xml similarity index 96% rename from tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml rename to tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-includes.xml index fa7c74fcf..1c3c68663 100644 --- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml +++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-includes.xml @@ -35,7 +35,7 @@ </params> <pipesReporter class="org.apache.tika.pipes.reporters.jdbc.JDBCPipesReporter"> <params> - <connection>jdbc:h2:mem:test_tika</connection> + <connection>CONNECTION_STRING</connection> <includes> <include>PARSE_SUCCESS</include> <include>PARSE_SUCCESS_WITH_EXCEPTION</include>