This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c06d3f1 Add javadoc for stream ingestion integration tests (#9795)
c06d3f1 is described below
commit c06d3f14b148d7b68ab4a9c45bbb082ba33618aa
Author: Jihoon Son <[email protected]>
AuthorDate: Tue May 12 08:56:43 2020 -0700
Add javadoc for stream ingestion integration tests (#9795)
---
.../druid/testing/utils/KafkaEventWriter.java | 6 ++++
.../druid/testing/utils/KinesisEventWriter.java | 12 ++++++--
.../druid/testing/utils/StreamEventWriter.java | 25 +++++++++++++++--
.../testing/utils/SyntheticStreamGenerator.java | 4 +--
.../indexer/AbstractKafkaIndexingServiceTest.java | 5 ++--
.../AbstractKinesisIndexingServiceTest.java | 12 +++++++-
.../tests/indexer/AbstractStreamIndexingTest.java | 32 ++++++++++++++--------
.../ITKinesisIndexingServiceSerializedTest.java | 6 ++--
.../ITKinesisIndexingServiceDataFormatTest.java | 2 +-
.../ITKinesisIndexingServiceParallelizedTest.java | 6 ++--
10 files changed, 81 insertions(+), 29 deletions(-)
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
index 14b5714..a9d6da9 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KafkaEventWriter.java
@@ -66,6 +66,12 @@ public class KafkaEventWriter implements StreamEventWriter
}
@Override
+ public boolean supportTransaction()
+ {
+ return true;
+ }
+
+ @Override
public boolean isTransactionEnabled()
{
return txnEnabled;
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java
index 39b700c..f0c66fc 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/utils/KinesisEventWriter.java
@@ -60,21 +60,27 @@ public class KinesisEventWriter implements StreamEventWriter
}
@Override
- public boolean isTransactionEnabled()
+ public boolean supportTransaction()
{
return false;
}
@Override
+ public boolean isTransactionEnabled()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void initTransaction()
{
- // No-Op as Kinesis does not support transaction
+ throw new UnsupportedOperationException();
}
@Override
public void commitTransaction()
{
- // No-Op as Kinesis does not support transaction
+ throw new UnsupportedOperationException();
}
@Override
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java
b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java
index 747cbd8..2f6e065 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/utils/StreamEventWriter.java
@@ -28,22 +28,43 @@ import java.io.Closeable;
*/
public interface StreamEventWriter extends Closeable
{
+ /**
+ * Returns true if the stream supports transactions.
+ */
+ boolean supportTransaction();
+
+ /**
+ * Returns true if the transaction is enabled for this writer. Callers
should check {@link #supportTransaction()}
+ * before calling this method.
+ *
+ * @throws UnsupportedOperationException if {@link #supportTransaction()}
returns false.
+ */
boolean isTransactionEnabled();
+ /**
+ * Initializes a transaction for this writer.
+ *
+ * @throws UnsupportedOperationException if {@link #supportTransaction()}
returns false.
+ */
void initTransaction();
+ /**
+ * Commits a transaction.
+ *
+ * @throws UnsupportedOperationException if {@link #supportTransaction()}
returns false.
+ */
void commitTransaction();
void write(String topic, byte[] event);
/**
- * Flush pending writes on the underlying stream. This method is synchronous
and waits until the flush completes.
+ * Flushes pending writes on the underlying stream. This method is
synchronous and waits until the flush completes.
* Note that this method is not interruptible
*/
void flush();
/**
- * Close this writer. Any resource should be cleaned up when this method is
called.
+ * Closes this writer. Any resource should be cleaned up when this method is
called.
* Implementations must call {@link #flush()} before closing the writer.
*/
@Override
diff --git
a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
index c68db10..655a95c 100644
---
a/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
+++
b/integration-tests/src/main/java/org/apache/druid/testing/utils/SyntheticStreamGenerator.java
@@ -81,7 +81,7 @@ public abstract class SyntheticStreamGenerator implements
StreamGenerator
nowCeilingToSecond.plusSeconds(1).minus(cyclePaddingMs)
);
- if (streamEventWriter.isTransactionEnabled()) {
+ if (streamEventWriter.supportTransaction() &&
streamEventWriter.isTransactionEnabled()) {
streamEventWriter.initTransaction();
}
@@ -98,7 +98,7 @@ public abstract class SyntheticStreamGenerator implements
StreamGenerator
}
}
- if (streamEventWriter.isTransactionEnabled()) {
+ if (streamEventWriter.supportTransaction() &&
streamEventWriter.isTransactionEnabled()) {
streamEventWriter.commitTransaction();
}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
index 3bb9693..9d1a69b 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKafkaIndexingServiceTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.tests.indexer;
+import com.google.common.base.Preconditions;
import org.apache.druid.indexing.kafka.KafkaConsumerConfigs;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.testing.IntegrationTestingConfig;
@@ -40,9 +41,9 @@ public abstract class AbstractKafkaIndexingServiceTest
extends AbstractStreamInd
}
@Override
- public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig
config, boolean transactionEnabled)
+ public StreamEventWriter createStreamEventWriter(IntegrationTestingConfig
config, Boolean transactionEnabled)
{
- return new KafkaEventWriter(config, transactionEnabled);
+ return new KafkaEventWriter(config,
Preconditions.checkNotNull(transactionEnabled, "transactionEnabled"));
}
@Override
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
index b8095a3..25dd871 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractKinesisIndexingServiceTest.java
@@ -20,16 +20,20 @@
package org.apache.druid.tests.indexer;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.utils.KinesisAdminClient;
import org.apache.druid.testing.utils.KinesisEventWriter;
import org.apache.druid.testing.utils.StreamAdminClient;
import org.apache.druid.testing.utils.StreamEventWriter;
+import javax.annotation.Nullable;
import java.util.function.Function;
public abstract class AbstractKinesisIndexingServiceTest extends
AbstractStreamIndexingTest
{
+ private static final Logger LOG = new
Logger(AbstractKinesisIndexingServiceTest.class);
+
@Override
StreamAdminClient createStreamAdminClient(IntegrationTestingConfig config)
throws Exception
{
@@ -37,9 +41,15 @@ public abstract class AbstractKinesisIndexingServiceTest
extends AbstractStreamI
}
@Override
- StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config,
boolean transactionEnabled)
+ StreamEventWriter createStreamEventWriter(IntegrationTestingConfig config,
@Nullable Boolean transactionEnabled)
throws Exception
{
+ if (transactionEnabled != null) {
+ LOG.warn(
+ "Kinesis event writer doesn't support transaction. Ignoring the
given parameter transactionEnabled[%s]",
+ transactionEnabled
+ );
+ }
return new KinesisEventWriter(config.getStreamEndpoint(), false);
}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
index 506e80f..9a1dbb3 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java
@@ -40,6 +40,7 @@ import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
+import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
@@ -95,8 +96,14 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
abstract StreamAdminClient createStreamAdminClient(IntegrationTestingConfig
config) throws Exception;
- abstract StreamEventWriter createStreamEventWriter(IntegrationTestingConfig
config, boolean transactionEnabled)
- throws Exception;
+ /**
+ * Create an event writer for an underlying stream. {@code
transactionEnabled} should not be null if the stream
+ * supports transactions. It is ignored otherwise.
+ */
+ abstract StreamEventWriter createStreamEventWriter(
+ IntegrationTestingConfig config,
+ @Nullable Boolean transactionEnabled
+ ) throws Exception;
abstract Function<String, String> generateStreamIngestionPropsTransform(
String streamName,
@@ -160,7 +167,7 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
}
protected void doTestIndexDataStableState(
- boolean transactionEnabled,
+ @Nullable Boolean transactionEnabled,
String serializerPath,
String parserType,
String specPath
@@ -194,7 +201,7 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
}
}
- void doTestIndexDataWithLosingCoordinator(boolean transactionEnabled) throws
Exception
+ void doTestIndexDataWithLosingCoordinator(@Nullable Boolean
transactionEnabled) throws Exception
{
testIndexWithLosingNodeHelper(
() -> druidClusterAdminClient.restartCoordinatorContainer(),
@@ -203,7 +210,7 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
);
}
- void doTestIndexDataWithLosingOverlord(boolean transactionEnabled) throws
Exception
+ void doTestIndexDataWithLosingOverlord(@Nullable Boolean transactionEnabled)
throws Exception
{
testIndexWithLosingNodeHelper(
() -> druidClusterAdminClient.restartIndexerContainer(),
@@ -212,7 +219,7 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
);
}
- void doTestIndexDataWithLosingHistorical(boolean transactionEnabled) throws
Exception
+ void doTestIndexDataWithLosingHistorical(@Nullable Boolean
transactionEnabled) throws Exception
{
testIndexWithLosingNodeHelper(
() -> druidClusterAdminClient.restartHistoricalContainer(),
@@ -221,7 +228,7 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
);
}
- protected void doTestIndexDataWithStartStopSupervisor(boolean
transactionEnabled) throws Exception
+ protected void doTestIndexDataWithStartStopSupervisor(@Nullable Boolean
transactionEnabled) throws Exception
{
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
INPUT_FORMAT,
@@ -284,22 +291,22 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
}
}
- protected void doTestIndexDataWithStreamReshardSplit(boolean
transactionEnabled) throws Exception
+ protected void doTestIndexDataWithStreamReshardSplit(@Nullable Boolean
transactionEnabled) throws Exception
{
// Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT * 2
testIndexWithStreamReshardHelper(transactionEnabled, STREAM_SHARD_COUNT *
2);
}
- protected void doTestIndexDataWithStreamReshardMerge(boolean
transactionEnabled) throws Exception
+ protected void doTestIndexDataWithStreamReshardMerge() throws Exception
{
// Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT / 2
- testIndexWithStreamReshardHelper(transactionEnabled, STREAM_SHARD_COUNT /
2);
+ testIndexWithStreamReshardHelper(null, STREAM_SHARD_COUNT / 2);
}
private void testIndexWithLosingNodeHelper(
Runnable restartRunnable,
Runnable waitForReadyRunnable,
- boolean transactionEnabled
+ @Nullable Boolean transactionEnabled
) throws Exception
{
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
@@ -376,7 +383,8 @@ public abstract class AbstractStreamIndexingTest extends
AbstractIndexerTest
}
}
- private void testIndexWithStreamReshardHelper(boolean transactionEnabled,
int newShardCount) throws Exception
+ private void testIndexWithStreamReshardHelper(@Nullable Boolean
transactionEnabled, int newShardCount)
+ throws Exception
{
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
INPUT_FORMAT,
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java
index fed1361..02e9b6d 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKinesisIndexingServiceSerializedTest.java
@@ -47,7 +47,7 @@ public class ITKinesisIndexingServiceSerializedTest extends
AbstractKinesisIndex
@Test
public void testKinesisIndexDataWithLosingCoordinator() throws Exception
{
- doTestIndexDataWithLosingCoordinator(false);
+ doTestIndexDataWithLosingCoordinator(null);
}
/**
@@ -56,7 +56,7 @@ public class ITKinesisIndexingServiceSerializedTest extends
AbstractKinesisIndex
@Test
public void testKinesisIndexDataWithLosingOverlord() throws Exception
{
- doTestIndexDataWithLosingOverlord(false);
+ doTestIndexDataWithLosingOverlord(null);
}
/**
@@ -65,6 +65,6 @@ public class ITKinesisIndexingServiceSerializedTest extends
AbstractKinesisIndex
@Test
public void testKinesisIndexDataWithLosingHistorical() throws Exception
{
- doTestIndexDataWithLosingHistorical(false);
+ doTestIndexDataWithLosingHistorical(null);
}
}
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java
index c302cd1..47ffffd 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceDataFormatTest.java
@@ -85,7 +85,7 @@ public class ITKinesisIndexingServiceDataFormatTest extends
AbstractKinesisIndex
public void testIndexData(String serializerPath, String parserType, String
specPath)
throws Exception
{
- doTestIndexDataStableState(false, serializerPath, parserType, specPath);
+ doTestIndexDataStableState(null, serializerPath, parserType, specPath);
}
@Override
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java
index efd107f..968f794 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKinesisIndexingServiceParallelizedTest.java
@@ -49,7 +49,7 @@ public class ITKinesisIndexingServiceParallelizedTest extends
AbstractKinesisInd
@Test
public void testKinesisIndexDataWithStartStopSupervisor() throws Exception
{
- doTestIndexDataWithStartStopSupervisor(false);
+ doTestIndexDataWithStartStopSupervisor(null);
}
/**
@@ -59,7 +59,7 @@ public class ITKinesisIndexingServiceParallelizedTest extends
AbstractKinesisInd
@Test
public void testKinesisIndexDataWithKinesisReshardSplit() throws Exception
{
- doTestIndexDataWithStreamReshardSplit(false);
+ doTestIndexDataWithStreamReshardSplit(null);
}
/**
@@ -69,6 +69,6 @@ public class ITKinesisIndexingServiceParallelizedTest extends
AbstractKinesisInd
@Test
public void testKinesisIndexDataWithKinesisReshardMerge() throws Exception
{
- doTestIndexDataWithStreamReshardMerge(false);
+ doTestIndexDataWithStreamReshardMerge();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]