This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new e63730e12 TIKA-4213 -- improve jdbc pipes reporter (#1669)
e63730e12 is described below

commit e63730e126e74b4ac36e5f2b8c6790963eb41c14
Author: Tim Allison <talli...@apache.org>
AuthorDate: Thu Mar 21 08:42:25 2024 -0400

    TIKA-4213 -- improve jdbc pipes reporter (#1669)
    
    * TIKA-4213 -- improve jdbc pipes reporter
---
 .../pipes/reporters/jdbc/JDBCPipesReporter.java    | 52 ++++++++++++----------
 1 file changed, 29 insertions(+), 23 deletions(-)

diff --git 
a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
 
b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
index 0082eb9de..ee52bf80f 100644
--- 
a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
+++ 
b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
@@ -22,6 +22,8 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Timestamp;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -68,7 +70,7 @@ public class JDBCPipesReporter extends PipesReporterBase 
implements Initializabl
     private String connectionString;
 
     private Optional<String> postConnectionString = Optional.empty();
-    private final ArrayBlockingQueue<KeyStatusPair> queue =
+    private final ArrayBlockingQueue<IdStatusPair> queue =
             new ArrayBlockingQueue(ARRAY_BLOCKING_QUEUE_SIZE);
     CompletableFuture<Void> reportWorkerFuture;
 
@@ -146,7 +148,7 @@ public class JDBCPipesReporter extends PipesReporterBase 
implements Initializabl
             return;
         }
         try {
-            queue.offer(new KeyStatusPair(t.getEmitKey().getEmitKey(), 
result.getStatus()),
+            queue.offer(new IdStatusPair(t.getId(), result.getStatus()),
                     MAX_WAIT_MILLIS, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             //swallow
@@ -167,7 +169,7 @@ public class JDBCPipesReporter extends PipesReporterBase 
implements Initializabl
     @Override
     public void close() throws IOException {
         try {
-            queue.offer(KeyStatusPair.END_SEMAPHORE, 60, TimeUnit.SECONDS);
+            queue.offer(IdStatusPair.END_SEMAPHORE, 60, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
             return;
         }
@@ -186,20 +188,20 @@ public class JDBCPipesReporter extends PipesReporterBase 
implements Initializabl
         }
     }
 
-    private static class KeyStatusPair {
+    private static class IdStatusPair {
 
-        static KeyStatusPair END_SEMAPHORE = new KeyStatusPair(null, null);
-        private final String emitKey;
+        static IdStatusPair END_SEMAPHORE = new IdStatusPair(null, null);
+        private final String id;
         private final PipesResult.STATUS status;
 
-        public KeyStatusPair(String emitKey, PipesResult.STATUS status) {
-            this.emitKey = emitKey;
+        public IdStatusPair(String id, PipesResult.STATUS status) {
+            this.id = id;
             this.status = status;
         }
 
         @Override
         public String toString() {
-            return "KeyStatusPair{" + "emitKey='" + emitKey + '\'' + ", 
status=" + status + '}';
+            return "KeyStatusPair{" + "id='" + id + '\'' + ", status=" + 
status + '}';
         }
     }
 
@@ -208,18 +210,18 @@ public class JDBCPipesReporter extends PipesReporterBase 
implements Initializabl
         private static final int MAX_TRIES = 3;
         private final String connectionString;
         private final Optional<String> postConnectionString;
-        private final ArrayBlockingQueue<KeyStatusPair> queue;
+        private final ArrayBlockingQueue<IdStatusPair> queue;
         private final int cacheSize;
         private final long reportWithinMs;
 
-        List<KeyStatusPair> cache = new ArrayList<>();
+        List<IdStatusPair> cache = new ArrayList<>();
         private Connection connection;
         private PreparedStatement insert;
 
 
         public ReportWorker(String connectionString,
                             Optional<String> postConnectionString,
-                            ArrayBlockingQueue<KeyStatusPair> queue, int 
cacheSize,
+                            ArrayBlockingQueue<IdStatusPair> queue, int 
cacheSize,
                             long reportWithinMs) {
             this.connectionString = connectionString;
             this.postConnectionString = postConnectionString;
@@ -242,18 +244,19 @@ public class JDBCPipesReporter extends PipesReporterBase 
implements Initializabl
         public void run() {
             long lastReported = System.currentTimeMillis();
             while (true) {
-                //blocking
-                KeyStatusPair p = null;
+                IdStatusPair p = null;
                 try {
-                    p = queue.take();
+                    p = queue.poll(reportWithinMs, TimeUnit.MILLISECONDS);
                 } catch (InterruptedException e) {
                     return;
                 }
-                if (p == KeyStatusPair.END_SEMAPHORE) {
-                    shutdownNow();
-                    return;
+                if (p != null) {
+                    if (p == IdStatusPair.END_SEMAPHORE) {
+                        shutdownNow();
+                        return;
+                    }
+                    cache.add(p);
                 }
-                cache.add(p);
                 long elapsed = System.currentTimeMillis() - lastReported;
 
                 if (cache.size() >= cacheSize || elapsed > reportWithinMs) {
@@ -296,10 +299,11 @@ public class JDBCPipesReporter extends PipesReporterBase 
implements Initializabl
             int attempt = 0;
             while (++attempt < MAX_TRIES) {
                 try {
-                    for (KeyStatusPair p : cache) {
+                    for (IdStatusPair p : cache) {
                         insert.clearParameters();
-                        insert.setString(1, p.emitKey);
+                        insert.setString(1, p.id);
                         insert.setString(2, p.status.name());
+                        insert.setTimestamp(3, Timestamp.from(Instant.now()));
                         insert.addBatch();
                     }
                     insert.executeBatch();
@@ -317,7 +321,8 @@ public class JDBCPipesReporter extends PipesReporterBase 
implements Initializabl
             try (Statement st = connection.createStatement()) {
                 String sql = "drop table if exists " + TABLE_NAME;
                 st.execute(sql);
-                sql = "create table " + TABLE_NAME + " (path varchar(1024), 
status varchar(32))";
+                sql = "create table " + TABLE_NAME + " (id varchar(1024), 
status varchar(32), " +
+                        "timestamp timestamp with time zone)";
                 st.execute(sql);
             }
         }
@@ -370,7 +375,8 @@ public class JDBCPipesReporter extends PipesReporterBase 
implements Initializabl
         }
 
         private void createPreparedStatement() throws SQLException {
-            String sql = "insert into " + TABLE_NAME + " (path, status) values 
(?,?)";
+            //do we want to do an upsert?
+            String sql = "insert into " + TABLE_NAME + " (id, status, 
timestamp) values (?,?,?)";
             insert = connection.prepareStatement(sql);
         }
     }

Reply via email to