This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c06d3f1  Add javadoc for stream ingestion integration tests (#9795)
c06d3f1 is described below

commit c06d3f14b148d7b68ab4a9c45bbb082ba33618aa
Author: Jihoon Son <[email protected]>
AuthorDate: Tue May 12 08:56:43 2020 -0700

    Add javadoc for stream ingestion integration tests (#9795)
---
 .../druid/testing/utils/KafkaEventWriter.java      |  6 ++++
 .../druid/testing/utils/KinesisEventWriter.java    | 12 ++++++--
 .../druid/testing/utils/StreamEventWriter.java     | 25 +++++++++++++++--
 .../testing/utils/SyntheticStreamGenerator.java    |  4 +--
 .../indexer/AbstractKafkaIndexingServiceTest.java  |  5 ++--
 .../AbstractKinesisIndexingServiceTest.java        | 12 +++++++-
 .../tests/indexer/AbstractStreamIndexingTest.java  | 32 ++++++++++++++--------
 .../ITKinesisIndexingServiceSerializedTest.java    |  6 ++--
 .../ITKinesisIndexingServiceDataFormatTest.java    |  2 +-
 .../ITKinesisIndexingServiceParallelizedTest.java  |  6 ++--
 10 files changed, 81 insertions(+), 29 deletions(-)

diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
index 14b5714..a9d6da9 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
@@ -66,6 +66,12 @@ public class KafkaEventWriter implements StreamEventWriter
   }
 
   @Override
+  public boolean supportTransaction()
+  {
+    return true;
+  }
+
+  @Override
   public boolean isTransactionEnabled()
   {
     return txnEnabled;
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java
index 39b700c..f0c66fc 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java
@@ -60,21 +60,27 @@ public class KinesisEventWriter implements StreamEventWriter
   }
 
   @Override
-  public boolean isTransactionEnabled()
+  public boolean supportTransaction()
   {
     return false;
   }
 
   @Override
+  public boolean isTransactionEnabled()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public void initTransaction()
   {
-    // No-Op as Kinesis does not support transaction
+    throw new UnsupportedOperationException();
   }
 
   @Override
   public void commitTransaction()
   {
-    // No-Op as Kinesis does not support transaction
+    throw new UnsupportedOperationException();
   }
 
   @Override
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java
index 747cbd8..2f6e065 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java
@@ -28,22 +28,43 @@ import java.io.Closeable;
  */
 public interface StreamEventWriter extends Closeable
 {
+  /**
+   * Returns true if the stream supports transactions.
+   */
+  boolean supportTransaction();
+
+  /**
+   * Returns true if the transaction is enabled for this writer. Callers 
should check {@link #supportTransaction()}
+   * before calling this method.
+   *
+   * @throws UnsupportedOperationException if {@link #supportTransaction()} 
returns false.
+   */
   boolean isTransactionEnabled();
 
+  /**
+   * Initializes a transaction for this writer.
+   *
+   * @throws UnsupportedOperationException if {@link #supportTransaction()} 
returns false.
+   */
   void initTransaction();
 
+  /**
+   * Commits a transaction.
+   *
+   * @throws UnsupportedOperationException if {@link #supportTransaction()} 
returns false.
+   */
   void commitTransaction();
 
   void write(String topic, byte[] event);
 
   /**
-   * Flush pending writes on the underlying stream. This method is synchronous 
and waits until the flush completes.
+   * Flushes pending writes on the underlying stream. This method is 
synchronous and waits until the flush completes.
    * Note that this method is not interruptible
    */
   void flush();
 
   /**
-   * Close this writer. Any resource should be cleaned up when this method is 
called.
+   * Closes this writer. Any resource should be cleaned up when this method is 
called.
    * Implementations must call {@link #flush()} before closing the writer.
    */
   @Override
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
index c68db10..655a95c 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
@@ -81,7 +81,7 @@ public abstract class SyntheticStreamGenerator implements 
StreamGenerator
             nowCeilingToSecond.plusSeconds(1).minus(cyclePaddingMs)
         );
 
-        if (streamEventWriter.isTransactionEnabled()) {
+        if (streamEventWriter.supportTransaction() && 
streamEventWriter.isTransactionEnabled()) {
           streamEventWriter.initTransaction();
         }
 
@@ -98,7 +98,7 @@ public abstract class SyntheticStreamGenerator implements 
StreamGenerator
           }
         }
 
-        if (streamEventWriter.isTransactionEnabled()) {
+        if (streamEventWriter.supportTransaction() && 
streamEventWriter.isTransactionEnabled()) {
           streamEventWriter.commitTransaction();
         }
 
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
index 3bb9693..9d1a69b 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.tests.indexer;
 
+import com.google.common.base.Preconditions;
 import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.testing.IntegrationTestingConfig;
@@ -40,9 +41,9 @@ public abstract class AbstractKafkaIndexingServiceTest 
extends AbstractStreamInd
   }
 
   @Override
-  public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig 
config, boolean transactionEnabled)
+  public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig 
config, Boolean transactionEnabled)
   {
-    return new KafkaEventWriter(config, transactionEnabled);
+    return new KafkaEventWriter(config, 
Preconditions.checkNotNull(transactionEnabled, "transactionEnabled"));
   }
 
   @Override
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
index b8095a3..25dd871 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
@@ -20,16 +20,20 @@
 package org.apache.druid.tests.indexer;
 
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.utils.KinesisAdminClient;
 import org.apache.druid.testing.utils.KinesisEventWriter;
 import org.apache.druid.testing.utils.StreamAdminClient;
 import org.apache.druid.testing.utils.StreamEventWriter;
 
+import javax.annotation.Nullable;
 import java.util.function.Function;
 
 public abstract class AbstractKinesisIndexingServiceTest extends 
AbstractStreamIndexingTest
 {
+  private static final Logger LOG = new 
Logger(AbstractKinesisIndexingServiceTest.class);
+
   @Override
   StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config) 
throws Exception
   {
@@ -37,9 +41,15 @@ public abstract class AbstractKinesisIndexingServiceTest 
extends AbstractStreamI
   }
 
   @Override
-  StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, 
boolean transactionEnabled)
+  StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config, 
@Nullable Boolean transactionEnabled)
       throws Exception
   {
+    if (transactionEnabled != null) {
+      LOG.warn(
+          "Kinesis event writer doesn't support transaction. Ignoring the 
given parameter transactionEnabled[%s]",
+          transactionEnabled
+      );
+    }
     return new KinesisEventWriter(config.getStreamEndpoint(), false);
   }
 
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index 506e80f..9a1dbb3 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -40,6 +40,7 @@ import org.joda.time.DateTime;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
+import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.HashMap;
@@ -95,8 +96,14 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
 
   abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig 
config) throws Exception;
 
-  abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig 
config, boolean transactionEnabled)
-      throws Exception;
+  /**
+   * Create an event writer for an underlying stream. {@code 
transactionEnabled} should not be null if the stream
+   * supports transactions. It is ignored otherwise.
+   */
+  abstract StreamEventWriter createStreamEventWriter(
+      IntegrationTestingConfig config,
+      @Nullable Boolean transactionEnabled
+  ) throws Exception;
 
   abstract Function<String, String> generateStreamIngestionPropsTransform(
       String streamName,
@@ -160,7 +167,7 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
   }
 
   protected void doTestIndexDataStableState(
-      boolean transactionEnabled,
+      @Nullable Boolean transactionEnabled,
       String serializerPath,
       String parserType,
       String specPath
@@ -194,7 +201,7 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
     }
   }
 
-  void doTestIndexDataWithLosingCoordinator(boolean transactionEnabled) throws 
Exception
+  void doTestIndexDataWithLosingCoordinator(@Nullable Boolean 
transactionEnabled) throws Exception
   {
     testIndexWithLosingNodeHelper(
         () -> druidClusterAdminClient.restartCoordinatorContainer(),
@@ -203,7 +210,7 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
     );
   }
 
-  void doTestIndexDataWithLosingOverlord(boolean transactionEnabled) throws 
Exception
+  void doTestIndexDataWithLosingOverlord(@Nullable Boolean transactionEnabled) 
throws Exception
   {
     testIndexWithLosingNodeHelper(
         () -> druidClusterAdminClient.restartIndexerContainer(),
@@ -212,7 +219,7 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
     );
   }
 
-  void doTestIndexDataWithLosingHistorical(boolean transactionEnabled) throws 
Exception
+  void doTestIndexDataWithLosingHistorical(@Nullable Boolean 
transactionEnabled) throws Exception
   {
     testIndexWithLosingNodeHelper(
         () -> druidClusterAdminClient.restartHistoricalContainer(),
@@ -221,7 +228,7 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
     );
   }
 
-  protected void doTestIndexDataWithStartStopSupervisor(boolean 
transactionEnabled) throws Exception
+  protected void doTestIndexDataWithStartStopSupervisor(@Nullable Boolean 
transactionEnabled) throws Exception
   {
     final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
         INPUT_FORMAT,
@@ -284,22 +291,22 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
     }
   }
 
-  protected void doTestIndexDataWithStreamReshardSplit(boolean 
transactionEnabled) throws Exception
+  protected void doTestIndexDataWithStreamReshardSplit(@Nullable Boolean 
transactionEnabled) throws Exception
   {
     // Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT * 2
     testIndexWithStreamReshardHelper(transactionEnabled, STREAM_SHARD_COUNT * 
2);
   }
 
-  protected void doTestIndexDataWithStreamReshardMerge(boolean 
transactionEnabled) throws Exception
+  protected void doTestIndexDataWithStreamReshardMerge() throws Exception
   {
     // Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT / 2
-    testIndexWithStreamReshardHelper(transactionEnabled, STREAM_SHARD_COUNT / 
2);
+    testIndexWithStreamReshardHelper(null, STREAM_SHARD_COUNT / 2);
   }
 
   private void testIndexWithLosingNodeHelper(
       Runnable restartRunnable,
       Runnable waitForReadyRunnable,
-      boolean transactionEnabled
+      @Nullable Boolean transactionEnabled
   ) throws Exception
   {
     final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
@@ -376,7 +383,8 @@ public abstract class AbstractStreamIndexingTest extends 
AbstractIndexerTest
     }
   }
 
-  private void testIndexWithStreamReshardHelper(boolean transactionEnabled, 
int newShardCount) throws Exception
+  private void testIndexWithStreamReshardHelper(@Nullable Boolean 
transactionEnabled, int newShardCount)
+      throws Exception
   {
     final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
         INPUT_FORMAT,
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java
index fed1361..02e9b6d 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java
@@ -47,7 +47,7 @@ public class ITKinesisIndexingServiceSerializedTest extends 
AbstractKinesisIndex
   @Test
   public void testKinesisIndexDataWithLosingCoordinator() throws Exception
   {
-    doTestIndexDataWithLosingCoordinator(false);
+    doTestIndexDataWithLosingCoordinator(null);
   }
 
   /**
@@ -56,7 +56,7 @@ public class ITKinesisIndexingServiceSerializedTest extends 
AbstractKinesisIndex
   @Test
   public void testKinesisIndexDataWithLosingOverlord() throws Exception
   {
-    doTestIndexDataWithLosingOverlord(false);
+    doTestIndexDataWithLosingOverlord(null);
   }
 
   /**
@@ -65,6 +65,6 @@ public class ITKinesisIndexingServiceSerializedTest extends 
AbstractKinesisIndex
   @Test
   public void testKinesisIndexDataWithLosingHistorical() throws Exception
   {
-    doTestIndexDataWithLosingHistorical(false);
+    doTestIndexDataWithLosingHistorical(null);
   }
 }
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java
index c302cd1..47ffffd 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java
@@ -85,7 +85,7 @@ public class ITKinesisIndexingServiceDataFormatTest extends 
AbstractKinesisIndex
   public void testIndexData(String serializerPath, String parserType, String 
specPath)
       throws Exception
   {
-    doTestIndexDataStableState(false, serializerPath, parserType, specPath);
+    doTestIndexDataStableState(null, serializerPath, parserType, specPath);
   }
 
   @Override
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java
index efd107f..968f794 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java
@@ -49,7 +49,7 @@ public class ITKinesisIndexingServiceParallelizedTest extends 
AbstractKinesisInd
   @Test
   public void testKinesisIndexDataWithStartStopSupervisor() throws Exception
   {
-    doTestIndexDataWithStartStopSupervisor(false);
+    doTestIndexDataWithStartStopSupervisor(null);
   }
 
   /**
@@ -59,7 +59,7 @@ public class ITKinesisIndexingServiceParallelizedTest extends 
AbstractKinesisInd
   @Test
   public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception
   {
-    doTestIndexDataWithStreamReshardSplit(false);
+    doTestIndexDataWithStreamReshardSplit(null);
   }
 
   /**
@@ -69,6 +69,6 @@ public class ITKinesisIndexingServiceParallelizedTest extends 
AbstractKinesisInd
   @Test
   public void testKinesisIndexDataWithKinesisReshardMerge() throws Exception
   {
-    doTestIndexDataWithStreamReshardMerge(false);
+    doTestIndexDataWithStreamReshardMerge();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to