clolov commented on code in PR #14285:
URL: https://github.com/apache/kafka/pull/14285#discussion_r1305505703


##########
core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala:
##########
@@ -293,12 +294,9 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
     TestUtils.deleteTopicWithAdmin(createAdminClient(), testTopicName, brokers)
     assertThrowsException(classOf[UnknownTopicOrPartitionException],
       () => TestUtils.describeTopic(createAdminClient(), testTopicName), 
"Topic should be deleted")
-
-    // FIXME: It seems the storage manager is being instantiated in different 
class loader so couldn't verify the value
-    //  but ensured it by adding a log statement in the storage manager 
(manually).
-    //    assertEquals(numPartitions * MyRemoteLogMetadataManager.segmentCount,
-    //      MyRemoteStorageManager.deleteSegmentEventCounter.get(),
-    //      "Remote log segments should be deleted only once by the leader")
+    TestUtils.waitUntilTrue(() =>
+      numPartitions * MyRemoteLogMetadataManager.segmentCount == 
MyRemoteStorageManager.deleteSegmentEventCounter.get(),

Review Comment:
   Can you change the `segmentCount` property of MyRemoteLogMetadataManager to 
`segmentCountPerPartition`?



##########
storage/src/test/java/org/apache/kafka/server/log/remote/storage/LocalTieredStorage.java:
##########
@@ -307,7 +307,7 @@ public Optional<CustomMetadata> copyLogSegmentData(final 
RemoteLogSegmentMetadat
             throws RemoteStorageException {
         Callable<Optional<CustomMetadata>> callable = () -> {
             final RemoteLogSegmentId id = metadata.remoteLogSegmentId();

Review Comment:
   On line 316 you can change `id.topicIdPartition()` to 
`metadata.topicIdPartition()` and remove this line



##########
storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java:
##########
@@ -138,18 +140,21 @@ private boolean compare(ByteBuffer lhs,
     private SimpleRecord convert(Object recordCandidate) {
         if (recordCandidate instanceof ProducerRecord) {
             ProducerRecord<?, ?> record = (ProducerRecord<?, ?>) 
recordCandidate;
+            long timestamp = record.timestamp() == null ? 
RecordBatch.NO_TIMESTAMP : record.timestamp();
             ByteBuffer keyBytes =
                     
Utils.wrapNullable(keySerde.serializer().serialize(topicPartition.topic(), (K) 
record.key()));
             ByteBuffer valueBytes =
                     
Utils.wrapNullable(valueSerde.serializer().serialize(topicPartition.topic(), 
(V) record.value()));
-            return new SimpleRecord(record.timestamp(), keyBytes, valueBytes, 
record.headers().toArray());
+            Header[] headers = record.headers() != null ? 
record.headers().toArray() : Record.EMPTY_HEADERS;
+            return new SimpleRecord(timestamp, keyBytes, valueBytes, headers);
         } else if (recordCandidate instanceof ConsumerRecord) {
             ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) 
recordCandidate;
             ByteBuffer keyBytes =
                     
Utils.wrapNullable(keySerde.serializer().serialize(topicPartition.topic(), (K) 
record.key()));
             ByteBuffer valueBytes =
                     
Utils.wrapNullable(valueSerde.serializer().serialize(topicPartition.topic(), 
(V) record.value()));
-            return new SimpleRecord(record.timestamp(), keyBytes, valueBytes, 
record.headers().toArray());
+            Header[] headers = record.headers() != null ? 
record.headers().toArray() : Record.EMPTY_HEADERS;
+            return new SimpleRecord(record.timestamp(), keyBytes, valueBytes, 
headers);

Review Comment:
   For my understanding, why do you not need to carry out the null check for 
the timestamp of the record here, but you had to carry it above?



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java:
##########
@@ -309,7 +310,5 @@ public void printReport(PrintStream output) {
 
     @Override
     public void close() throws IOException {
-        Utils.closeAll(producer, consumer);

Review Comment:
   For my understanding, why are these safe to remove?



##########
storage/src/test/java/org/apache/kafka/tiered/storage/utils/RecordsKeyValueMatcher.java:
##########
@@ -138,18 +140,21 @@ private boolean compare(ByteBuffer lhs,
     private SimpleRecord convert(Object recordCandidate) {
         if (recordCandidate instanceof ProducerRecord) {
             ProducerRecord<?, ?> record = (ProducerRecord<?, ?>) 
recordCandidate;
+            long timestamp = record.timestamp() == null ? 
RecordBatch.NO_TIMESTAMP : record.timestamp();

Review Comment:
   Nit: Can either all conditions be == null or != null, but not mixed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to