kamalcph commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1582055490


##########
storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java:
##########
@@ -74,7 +74,7 @@ public Properties topicConfig() {
     public void 
maybeWaitForAtLeastOneSegmentUpload(scala.collection.Seq<TopicPartition> 
topicPartitions) {
         JavaConverters.seqAsJavaList(topicPartitions).forEach(topicPartition 
-> {
             List<BrokerLocalStorage> localStorages = 
JavaConverters.bufferAsJavaList(brokers()).stream()
-                    .map(b -> new BrokerLocalStorage(b.config().brokerId(), 
b.config().logDirs().head(), STORAGE_WAIT_TIMEOUT_SEC))
+                    .map(b -> new BrokerLocalStorage(b.config().brokerId(), 
JavaConverters.asJava(b.config().logDirs().toSet()), STORAGE_WAIT_TIMEOUT_SEC))

Review Comment:
   ditto



##########
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##########
@@ -31,31 +31,36 @@
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public final class BrokerLocalStorage {
 
     private final Integer brokerId;
-    private final File brokerStorageDirectory;
+    private final Set<File> brokerStorageDirectorys;

Review Comment:
   nit: `brokerStorageDirectorys` -> `brokerStorageDirectories`
   



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3287,11 +3287,9 @@ class ReplicaManagerTest {
     val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
     val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath
     if (enableRemoteStorage) {

Review Comment:
   nit: do we need this `if` check?



##########
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##########
@@ -141,7 +146,11 @@ private boolean 
isOffsetPresentInFirstLocalSegment(TopicPartition topicPartition
         if (offsetToSearch.equals(firstLogFileBaseOffset)) {
             return true;
         }
-        File partitionDir = new File(brokerStorageDirectory.getAbsolutePath(), 
topicPartition.toString());
+        File partitionDir = brokerStorageDirectorys.stream()
+                .filter(dir -> dirContainsTopicPartition(topicPartition, dir))
+                .findFirst()
+                .orElseThrow(() -> new 
IllegalArgumentException(String.format("[BrokerId=%d] Directory for the 
topic-partition %s " +
+                "was not found", brokerId, topicPartition)));

Review Comment:
   previously, we were returning the `partitionDir` instead of  `logDir`:
   
   ```suggestion
   File logDir = brokerStorageDirectorys.stream()
                   .filter(dir -> dirContainsTopicPartition(topicPartition, 
dir))
                   .findFirst()
                   .orElseThrow(() -> new 
IllegalArgumentException(String.format("[BrokerId=%d] Directory for the 
topic-partition %s " +
                   "was not found", brokerId, topicPartition)));
   File partitionDir = new File(logDir.getAbsolutePath(), 
topicPartition.toString());
   ```



##########
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##########
@@ -31,31 +31,36 @@
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public final class BrokerLocalStorage {
 
     private final Integer brokerId;
-    private final File brokerStorageDirectory;
+    private final Set<File> brokerStorageDirectorys;
     private final Integer storageWaitTimeoutSec;
 
     private final int storagePollPeriodSec = 1;
     private final Time time = Time.SYSTEM;
 
     public BrokerLocalStorage(Integer brokerId,
-                              String storageDirname,
+                              Set<String> storageDirnames,
                               Integer storageWaitTimeoutSec) {
         this.brokerId = brokerId;
-        this.brokerStorageDirectory = new File(storageDirname);
+        this.brokerStorageDirectorys = 
storageDirnames.stream().map(File::new).collect(Collectors.toSet());
         this.storageWaitTimeoutSec = storageWaitTimeoutSec;
     }
 
     public Integer getBrokerId() {
         return brokerId;
     }
 
+    public Set<File> getBrokerStorageDirectory() {

Review Comment:
   rename `getBrokerStorageDirectory` -> `getBrokerStorageDirectories`



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java:
##########
@@ -313,6 +314,14 @@ public TieredStorageTestBuilder reassignReplica(String 
topic,
         return this;
     }
 
+    public TieredStorageTestBuilder alterLogDir(String topic,
+                                                    Integer partition,

Review Comment:
   nit: parameter alignment



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java:
##########
@@ -154,7 +154,7 @@ public static List<LocalTieredStorage> 
remoteStorageManagers(Seq<KafkaBroker> br
     @SuppressWarnings("deprecation")
     public static List<BrokerLocalStorage> localStorages(Seq<KafkaBroker> 
brokers) {
         return JavaConverters.seqAsJavaList(brokers).stream()
-                .map(b -> new BrokerLocalStorage(b.config().brokerId(), 
b.config().logDirs().head(),
+                .map(b -> new BrokerLocalStorage(b.config().brokerId(), 
JavaConverters.asJava(b.config().logDirs().toSet()),

Review Comment:
   The build will fail to compile with scala 2.12 when `JavaConverters.asJava` 
is used:
   
   ```
   ./gradlew clean :storage:build -PscalaVersion=2.12
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to