frankgh commented on code in PR #107:
URL: 
https://github.com/apache/cassandra-analytics/pull/107#discussion_r2047936596


##########
scripts/build-dtest-jars.sh:
##########
@@ -31,14 +31,14 @@ else
   # moving back to a release tag would be.
   # Examples
   # a tagged release of Cassandra 4.0
-  #   "cassandra-4.0:cassandra-4.0.12"
+  #   "cassandra-4.0:cassandra-4.0.17"
    # a hash that points to a commit on the cassandra-4.0 branch
   #   "cassandra-4.0:1f79c8492528f01bcc5f88951a1cc9e0d7265c54"
   # the cassandra-4.0 branch - used for nightly integration test runs or local 
testing of new features
   #   "cassandra-4.0:cassandra-4.0"
   # Due to MacOS being stuck on Bash < 4, we don't use associative arrays here.
   CANDIDATE_BRANCHES=(
-    "cassandra-4.0:cassandra-4.0.12"
+    "cassandra-4.0:cassandra-4.0.17"

Review Comment:
   unrelated, but for 4.1 should we use the latest released version (4.1.8)?



##########
cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/bridge/SSTableWriterImplementation.java:
##########
@@ -69,94 +56,56 @@ public SSTableWriterImplementation(String inDirectory,
                                        @NotNull Set<String> 
userDefinedTypeStatements,
                                        int bufferSizeMB)
     {
-        this(inDirectory, partitioner, createStatement, insertStatement, 
userDefinedTypeStatements, bufferSizeMB, 10);
+        this(inDirectory, determineSupportedPartitioner(partitioner), 
createStatement, insertStatement, userDefinedTypeStatements, bufferSizeMB);
     }
 
     @VisibleForTesting
-    SSTableWriterImplementation(String inDirectory,
-                                String partitioner,
-                                String createStatement,
-                                String insertStatement,
-                                @NotNull Set<String> userDefinedTypeStatements,
-                                int bufferSizeMB,
-                                long sstableWatcherDelaySeconds)
+    public SSTableWriterImplementation(String inDirectory,
+                                       IPartitioner partitioner,
+                                       String createStatement,
+                                       String insertStatement,
+                                       @NotNull Set<String> 
userDefinedTypeStatements,
+                                       int bufferSizeMB)
     {
-        IPartitioner cassPartitioner = 
partitioner.toLowerCase().contains("random") ? new RandomPartitioner()
-                                                                               
     : new Murmur3Partitioner();
-
         this.writer = configureBuilder(inDirectory,
                                        createStatement,
                                        insertStatement,
                                        bufferSizeMB,
                                        userDefinedTypeStatements,
-                                       cassPartitioner)
+                                       this::onSSTablesProduced,
+                                       partitioner)
                       .build();
-        this.outputDir = Paths.get(inDirectory);
-        this.sstableWatcher = new SSTableWatcher(sstableWatcherDelaySeconds);
     }
 
-    private class SSTableWatcher implements Closeable
+    private static IPartitioner determineSupportedPartitioner(String 
partitioner)
     {
-        // The TOC component is the last one flushed when finishing a SSTable.
-        // Therefore, it monitors the creation of the TOC component to 
determine the creation of SSTable
-        private static final String TOC_COMPONENT_SUFFIX = "-TOC.txt";
-        private static final String GLOB_PATTERN_FOR_TOC = "*" + 
TOC_COMPONENT_SUFFIX;
-
-        private final ScheduledExecutorService sstableWatcherScheduler;
-        private final Set<SSTableDescriptor> knownSSTables;
-
-        SSTableWatcher(long delaySeconds)
-        {
-            ThreadFactory tf = ThreadUtil.threadFactory("SSTableWatcher-" + 
outputDir.getFileName().toString());
-            this.sstableWatcherScheduler = 
Executors.newSingleThreadScheduledExecutor(tf);
-            this.knownSSTables = new HashSet<>();
-            sstableWatcherScheduler.scheduleWithFixedDelay(this::listSSTables, 
delaySeconds, delaySeconds, TimeUnit.SECONDS);
-        }
+        return partitioner.toLowerCase().contains("random")
+               ? new RandomPartitioner()
+               : new Murmur3Partitioner();
+    }
 
-        private void listSSTables()
-        {
-            try (DirectoryStream<Path> stream = 
Files.newDirectoryStream(outputDir, GLOB_PATTERN_FOR_TOC))
-            {
-                HashSet<SSTableDescriptor> newlyProducedSSTables = new 
HashSet<>();
-                stream.forEach(path -> {
-                    String baseFilename = 
path.getFileName().toString().replace(TOC_COMPONENT_SUFFIX, "");
-                    SSTableDescriptor sstable = new 
SSTableDescriptor(baseFilename);
-                    if (!knownSSTables.contains(sstable))
-                    {
-                        newlyProducedSSTables.add(sstable);
-                    }
-                });
-
-                if (!newlyProducedSSTables.isEmpty())
-                {
-                    knownSSTables.addAll(newlyProducedSSTables);
-                    producedSSTablesListener.accept(newlyProducedSSTables);
-                }
-            }
-            catch (IOException e)
-            {
-                LOGGER.warn("Fails to list SSTables", e);
-            }
-        }
+    private void onSSTablesProduced(Collection<SSTableReader> sstables)
+    {
+        Objects.requireNonNull(producedSSTablesListener, 
"producedSSTablesListener is not set");
+        Set<SSTableDescriptor> sstableDescriptors = sstables
+                                                    .stream()
+                                                    .map(sstable -> {
+                                                        String baseFilename = 
baseFilename(sstable.descriptor);
+                                                        // TODO: for now, the 
sstableReader is closed immediately,

Review Comment:
   should we do this as part of this PR? I'd argue that this is desirable. Any 
reason to not do it here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to