yashmayya commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1239720643
########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ########## @@ -192,6 +209,236 @@ public void testCancelAfterAwaitFlush() throws Exception { flushFuture.get(1000, TimeUnit.MILLISECONDS); } + @Test + public void testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws InterruptedException, TimeoutException, ExecutionException { Review Comment: ```suggestion public void testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws Exception { ``` nit: since this is just a test method, we can avoid the verbosity ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -279,8 +284,59 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb throw new IllegalStateException("At least one non-null offset store must be provided"); } + boolean containsTombstones = values.containsValue(null); + + // If there are tombstone offsets, then the failure to write to secondary store will + // not be ignored. Also, for tombstone records, we first write to secondary store and + // then to primary stores. + if (secondaryStore != null && containsTombstones) { + AtomicReference<Throwable> secondaryStoreTombstoneWriteError = new AtomicReference<>(); + Future<Void> secondaryWriteFuture = secondaryStore.set(values, (secondaryWriteError, ignored) -> { + try (LoggingContext context = loggingContext()) { + if (secondaryWriteError != null) { + log.warn("Failed to write offsets with tombstone records to secondary backing store", secondaryWriteError); + secondaryStoreTombstoneWriteError.compareAndSet(null, secondaryWriteError); + } else { + log.debug("Successfully flushed tombstone(s)-containing to secondary backing store"); + } + } + }); + try { + // For EOS, there is no timeout for offset commit and it is allowed to take as much time as needed for + // commits. We still need to wait because we want to fail the offset commit for cases when + // tombstone records fail to be written to the secondary store. Note that while commitTransaction + // already waits for all records to be sent and ack'ed, in this case we do need to add an explicit + // blocking call. In case EOS is disabled, we wait for the same duration as `offset.commit.timeout.ms` + // and throw that exception which would allow the offset commit to fail. + if (exactlyOnce) { + secondaryWriteFuture.get(); + } else { + secondaryWriteFuture.get(offsetFlushTimeoutMs, TimeUnit.MILLISECONDS); + } + } catch (InterruptedException e) { + log.warn("{} Flush of tombstone(s)-containing to secondary store interrupted, cancelling", this); Review Comment: `Flush of tombstone(s)-containing to secondary store` Should this be `Flush of tombstone(s)-containing offsets batch to secondary store` instead? 😅 Also I'm wondering if all of these log lines need to necessarily contain this verbiage around tombstone containing offsets? Users probably don't really need to know (or care) that offset flushes are handled differently internally depending on whether or not they contain one or more `null` valued offsets. IMO, just including the details of the failure should be sufficient - WDYT? ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -279,8 +284,59 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb throw new IllegalStateException("At least one non-null offset store must be provided"); } + boolean containsTombstones = values.containsValue(null); + + // If there are tombstone offsets, then the failure to write to secondary store will + // not be ignored. Also, for tombstone records, we first write to secondary store and + // then to primary stores. + if (secondaryStore != null && containsTombstones) { + AtomicReference<Throwable> secondaryStoreTombstoneWriteError = new AtomicReference<>(); + Future<Void> secondaryWriteFuture = secondaryStore.set(values, (secondaryWriteError, ignored) -> { + try (LoggingContext context = loggingContext()) { + if (secondaryWriteError != null) { + log.warn("Failed to write offsets with tombstone records to secondary backing store", secondaryWriteError); + secondaryStoreTombstoneWriteError.compareAndSet(null, secondaryWriteError); + } else { + log.debug("Successfully flushed tombstone(s)-containing to secondary backing store"); + } + } + }); + try { + // For EOS, there is no timeout for offset commit and it is allowed to take as much time as needed for + // commits. We still need to wait because we want to fail the offset commit for cases when + // tombstone records fail to be written to the secondary store. Note that while commitTransaction + // already waits for all records to be sent and ack'ed, in this case we do need to add an explicit + // blocking call. In case EOS is disabled, we wait for the same duration as `offset.commit.timeout.ms` + // and throw that exception which would allow the offset commit to fail. + if (exactlyOnce) { + secondaryWriteFuture.get(); + } else { + secondaryWriteFuture.get(offsetFlushTimeoutMs, TimeUnit.MILLISECONDS); + } + } catch (InterruptedException e) { + log.warn("{} Flush of tombstone(s)-containing to secondary store interrupted, cancelling", this); + secondaryStoreTombstoneWriteError.compareAndSet(null, e); + } catch (ExecutionException e) { + log.error("{} Flush of tombstone(s)-containing offsets to secondary store threw an unexpected exception: ", this, e); + secondaryStoreTombstoneWriteError.compareAndSet(null, e); Review Comment: ```suggestion secondaryStoreTombstoneWriteError.compareAndSet(null, e.getCause()); ``` Since the `ExecutionException` will be wrapping the real exception ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ########## @@ -16,23 +16,36 @@ */ package org.apache.kafka.connect.storage; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.powermock.reflect.Whitebox; Review Comment: We shouldn't be using EasyMock / PowerMock anymore (https://issues.apache.org/jira/browse/KAFKA-7438). Previous usages in this test class were removed in https://github.com/apache/kafka/pull/11450 ########## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ########## @@ -192,6 +209,236 @@ public void testCancelAfterAwaitFlush() throws Exception { flushFuture.get(1000, TimeUnit.MILLISECONDS); } + @Test + public void testFlushFailureWhenWriteToSecondaryStoreFailsForTombstoneOffsets() throws InterruptedException, TimeoutException, ExecutionException { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", true); + + extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + + offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); + Future<Void> flushFuture = offsetStorageWriter.doFlush((error, result) -> { + assertEquals(PRODUCE_EXCEPTION, error); + assertNull(result); + }); + assertThrows(ExecutionException.class, () -> flushFuture.get(1000L, TimeUnit.MILLISECONDS)); + } + + @Test + public void testFlushSuccessWhenWritesSucceedToBothPrimaryAndSecondaryStoresForTombstoneOffsets() throws InterruptedException, TimeoutException, ExecutionException { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", false); + + extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + + offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); + offsetStorageWriter.doFlush((error, result) -> { + assertNull(error); + assertNull(result); + }).get(1000L, TimeUnit.MILLISECONDS); + } + + @Test + public void testFlushSuccessWhenWriteToSecondaryStoreFailsForNonTombstoneOffsets() throws InterruptedException, TimeoutException, ExecutionException { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", true); + + extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + + offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); + offsetStorageWriter.doFlush((error, result) -> { + assertNull(error); + assertNull(result); + }).get(1000L, TimeUnit.MILLISECONDS); + } + + @Test + public void testFlushSuccessWhenWritesToPrimaryAndSecondaryStoreSucceeds() throws InterruptedException, TimeoutException, ExecutionException { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", false); + + extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + + offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); + offsetStorageWriter.doFlush((error, result) -> { + assertNull(error); + assertNull(result); + }).get(1000L, TimeUnit.MILLISECONDS); + } + + @Test + public void testFlushFailureWhenWritesToPrimaryStoreFailsAndSecondarySucceeds() throws InterruptedException, TimeoutException { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", true); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", false); + + extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + + offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); + Future<Void> flushFuture = offsetStorageWriter.doFlush((error, result) -> { + assertEquals(PRODUCE_EXCEPTION, error); + assertNull(result); + }); + assertThrows(ExecutionException.class, () -> flushFuture.get(1000L, TimeUnit.MILLISECONDS)); + } + + @Test + public void testFlushFailureWhenWritesToPrimaryStoreFailsAndSecondarySucceedsForTombstoneRecords() throws InterruptedException, TimeoutException { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", true); + KafkaOffsetBackingStore workerStore = setupOffsetBackingStoreWithProducer("topic2", false); + + extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withConnectorAndWorkerStores( + () -> LoggingContext.forConnector("source-connector"), + workerStore, + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + + offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); + Future<Void> flushFuture = offsetStorageWriter.doFlush((error, result) -> { + assertEquals(PRODUCE_EXCEPTION, error); + assertNull(result); + }); + assertThrows(ExecutionException.class, () -> flushFuture.get(1000L, TimeUnit.MILLISECONDS)); + } + + @Test + public void testFlushSuccessWhenWritesToPrimaryStoreSucceedsWithNoSecondaryStore() throws InterruptedException, TimeoutException, ExecutionException { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", false); + + extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withOnlyConnectorStore( + () -> LoggingContext.forConnector("source-connector"), + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + + offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); + offsetStorageWriter.doFlush((error, result) -> { + assertNull(error); + assertNull(result); + }).get(1000L, TimeUnit.MILLISECONDS); + } + + @Test + public void testFlushFailureWhenWritesToPrimaryStoreFailsWithNoSecondaryStore() throws InterruptedException, TimeoutException { + + KafkaOffsetBackingStore connectorStore = setupOffsetBackingStoreWithProducer("topic1", true); + + extractKeyValue(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED); + + ConnectorOffsetBackingStore offsetBackingStore = ConnectorOffsetBackingStore.withOnlyConnectorStore( + () -> LoggingContext.forConnector("source-connector"), + connectorStore, + "offsets-topic", + mock(TopicAdmin.class)); + + OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(offsetBackingStore, NAMESPACE, keyConverter, valueConverter); + + offsetStorageWriter.offset(OFFSET_KEY, OFFSET_VALUE); + assertTrue(offsetStorageWriter.beginFlush(1000L, TimeUnit.MILLISECONDS)); + Future<Void> flushFuture = offsetStorageWriter.doFlush((error, result) -> { + assertEquals(PRODUCE_EXCEPTION, error); + assertNull(result); + }); + assertThrows(ExecutionException.class, () -> flushFuture.get(1000L, TimeUnit.MILLISECONDS)); + } + + @SuppressWarnings("unchecked") + private KafkaOffsetBackingStore setupOffsetBackingStoreWithProducer(String topic, boolean throwingProducer) { + KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(() -> mock(TopicAdmin.class), () -> "connect", mock(Converter.class)); + KafkaBasedLog<byte[], byte[]> kafkaBasedLog = new KafkaBasedLog<byte[], byte[]>(topic, new HashMap<>(), new HashMap<>(), + () -> mock(TopicAdmin.class), mock(Callback.class), new MockTime(), null); + Whitebox.setInternalState(kafkaBasedLog, "producer", Optional.of(createProducer(throwingProducer))); Review Comment: Maybe we could override the `KafkaBasedLog::createProducer` method instead of using `Whitebox.setInternalState` here? ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -279,10 +284,61 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb throw new IllegalStateException("At least one non-null offset store must be provided"); } + boolean containsTombstones = values.containsValue(null); + + // If there are tombstone offsets, then the failure to write to secondary store will + // not be ignored. Also, for tombstone records, we first write to secondary store and + // then to primary stores. + if (secondaryStore != null && containsTombstones) { + AtomicReference<Throwable> secondaryStoreTombstoneWriteError = new AtomicReference<>(); + Future<Void> secondaryWriteFuture = secondaryStore.set(values, (secondaryWriteError, ignored) -> { + try (LoggingContext context = loggingContext()) { + if (secondaryWriteError != null) { + log.warn("Failed to write offsets with tombstone records to secondary backing store", secondaryWriteError); + secondaryStoreTombstoneWriteError.compareAndSet(null, secondaryWriteError); + } else { + log.debug("Successfully flushed tombstone offsets to secondary backing store"); + } + } + }); Review Comment: Hm no, that's not what I was saying. My point was that the same error that we handle via the callback that we're passing into `KafkaOffsetBackingStore#set` will also be thrown (wrapped in an `ExecutionException`) when we call `get` on the `Future` returned from `KafkaOffsetBackingStore#set` thus rendering the error handling that we're doing in the passed callback redundant. I guess this is a result of the oddity of the `KafkaOffsetBackingStore#set` method signature - it's accepting a callback but also returning a future. Perhaps what we could do to avoid this double error handling is pass a `FutureCallback` to `KafkaOffsetBackingStore#set`, ignore the returned future and then call a get on the `FutureCallback` that we passed in? ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -279,10 +284,61 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb throw new IllegalStateException("At least one non-null offset store must be provided"); } + boolean containsTombstones = values.containsValue(null); + + // If there are tombstone offsets, then the failure to write to secondary store will + // not be ignored. Also, for tombstone records, we first write to secondary store and + // then to primary stores. + if (secondaryStore != null && containsTombstones) { + AtomicReference<Throwable> secondaryStoreTombstoneWriteError = new AtomicReference<>(); + Future<Void> secondaryWriteFuture = secondaryStore.set(values, (secondaryWriteError, ignored) -> { + try (LoggingContext context = loggingContext()) { + if (secondaryWriteError != null) { + log.warn("Failed to write offsets with tombstone records to secondary backing store", secondaryWriteError); + secondaryStoreTombstoneWriteError.compareAndSet(null, secondaryWriteError); + } else { + log.debug("Successfully flushed tombstone offsets to secondary backing store"); + } + } + }); + try { + // For EOS, there is no timeout for offset commit and it is allowed to take as much time as needed for + // commits. We still need to wait because we want to fail the offset commit for cases when + // tombstone records fail to be written to the secondary store. Note that while commitTransaction + // already waits for all records to be sent and ack'ed, in this case we do need to add an explicit + // blocking call. In case of ALOS, we wait for the same duration as `offset.commit.timeout.ms` + // and throw that exception which would allow the offset commit to fail. + if (isEOSEnabled) { + secondaryWriteFuture.get(); Review Comment: Just to clarify, this comment is stale now right? Nice tests btw! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org