Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/4605#discussion_r139485057
--- Diff:
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
---
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CassandraSinkBaseTest.class);
+
+ private static final String DUMMY_QUERY_STMT = "CQL_Dummy_Stmt";
+
+ private static final String DUMMY_MESSAGE = "Dummy_msg";
+
+ private static final int MAX_THREAD_NUM = 3;
+
+ private static final int MAX_THREAD_POOL_SIZE = 2;
+
+ @BeforeClass
+ public static void doSetUp() throws Exception {
+
+ }
+
+ @BeforeClass
+ public static void doTearDown() throws Exception {
+
+ }
+
+ /**
+ * Test ensures a NoHostAvailableException would be thrown if a contact
point added does not exist.
+ */
+ @Test(expected = NoHostAvailableException.class)
+ public void testCasHostNotFoundErrorHandling() throws Exception {
+ CassandraSinkBase base = new DummyCassandraSinkBase<>(new
ClusterBuilder() {
+ @Override
+ protected Cluster buildCluster(Cluster.Builder builder)
{
+ return builder
+ .addContactPoint("127.0.0.1")
+ .withoutJMXReporting()
+ .withoutMetrics().build();
+ }
+ });
+
+ base.open(new Configuration());
+ base.close();
+ }
+
+
+ ///////////////////////
+ // Single Thread Test
+ ///////////////////////
+
+ /**
+ * Test ensures the message could be delivered successfully to sink.
+ */
+ @Test
+ public void testSimpleSuccessfulPath() throws Exception {
+ ClusterBuilder builder = mock(ClusterBuilder.class);
+ MockCassandraSinkBase casSinkFunc = new
MockCassandraSinkBase<>(builder);
+ casSinkFunc.setFlushOnCheckpoint(true);
+ casSinkFunc.open(new Configuration());
+
+ casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE,
PredeterminedResult.IMMEDIATE_SUCCESS);
+
+ casSinkFunc.close();
+
+ //Final pending updates should be zero
+ Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+ }
+
+ /**
+ * Test ensures that an asyncError would be thrown on close() if
previously message delivery failed.
+ */
+ @Test
+ public void testAsyncErrorThrownOnClose() throws Exception {
+ ClusterBuilder builder = mock(ClusterBuilder.class);
+ MockCassandraSinkBase casSinkFunc = new
MockCassandraSinkBase<>(builder);
+ casSinkFunc.setFlushOnCheckpoint(true);
+
+ casSinkFunc.open(new Configuration());
+
+ casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE,
PredeterminedResult.IMMEDIATE_FAILURE);
+ try {
+
+ casSinkFunc.close();
+ } catch (IOException e) {
+ //expected async error from close()
+
+ Assert.assertTrue(e.getMessage().contains("Error while
sending value"));
+
+ //Final pending updates should be zero
+ Assert.assertEquals(0,
casSinkFunc.getNumOfPendingRecords());
+
+ //done
+ return;
+ }
+
+ Assert.fail();
+ }
+
+ /**
+ * Test ensures that an asyncError would be thrown on invoke() if
previously message delivery failed.
+ */
+ //TODO: should unitfy error handling logic in CassandraSinkBase
+ //Exception would have been thrown from invoke(), but asyncError was
not set null, hence it was rethrown in close()
+ @Ignore
+ @Test
+ public void testAsyncErrorThrownOnInvoke() throws Exception {
+ ClusterBuilder builder = mock(ClusterBuilder.class);
+ MockCassandraSinkBase casSinkFunc = new
MockCassandraSinkBase<>(builder);
+ casSinkFunc.setFlushOnCheckpoint(true);
+
+ casSinkFunc.open(new Configuration());
+
+ casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE,
PredeterminedResult.IMMEDIATE_FAILURE);
+ try {
+ casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE,
PredeterminedResult.IMMEDIATE_SUCCESS);
+ } catch (IOException e) {
+ //expected async error thrown from invoke()
+
+ //Final pending updates should be zero
+ Assert.assertEquals(0,
casSinkFunc.getNumOfPendingRecords());
+
+ casSinkFunc.close();
+
+ //done
+ return;
+ }
+
+ Assert.fail();
+ }
+
+ /**
+ * Test ensures that an asyncError would be thrown when checkpoint
performs if previously message delivery failed.
+ */
+ @Test
+ public void testAsyncErrorThrownOnCheckpoint() throws Exception {
+ ClusterBuilder builder = mock(ClusterBuilder.class);
+ MockCassandraSinkBase casSinkFunc = new
MockCassandraSinkBase<>(builder);
+ casSinkFunc.setFlushOnCheckpoint(true);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(casSinkFunc));
+
+ testHarness.open();
+
+ casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE,
PredeterminedResult.IMMEDIATE_FAILURE);
+
+ try {
+ testHarness.snapshot(123L, 123L);
+ } catch (Exception e) {
+ //expected async error from snapshotState()
+
+ Assert.assertTrue(e.getCause() instanceof
IllegalStateException);
+
Assert.assertTrue(e.getCause().getMessage().contains("Failed to send data to
Cassandra"));
+
+ //Final pending updates should be zero
+ Assert.assertEquals(0,
casSinkFunc.getNumOfPendingRecords());
+
+ casSinkFunc.close();
+
+ //done
+ return;
+ }
+
+ Assert.fail();
+ }
+
+ ///////////////////////
+ // Multi Thread Test
+ ///////////////////////
+
+ /**
+ * Test ensures that CassandraSinkBase would flush all in-flight
message on close(), accompanied with concurrent
+ * message delivery successfully via a thraedpool.
+ */
+ @Test
+ public void testFlushOnPendingRecordsOnCloseWithSuccessfulMessage()
throws Exception {
+ ClusterBuilder builder = mock(ClusterBuilder.class);
+
+ ExecutorService threadPool =
Executors.newFixedThreadPool(MAX_THREAD_POOL_SIZE);
+ MockCassandraSinkBase casSinkFunc = new
MockCassandraSinkBase<>(builder);
+ casSinkFunc.setFlushOnCheckpoint(true);
+
+ casSinkFunc.open(new Configuration());
+
+ for (int i = 0; i < MAX_THREAD_NUM; i++) {
+ threadPool.submit(() -> {
+ try {
+
+
casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE,
PredeterminedResult.DELAYED_SUCCESS);
+ } catch (Exception e) {
+ LOG.error("Error while dispatching
sending message to Cassandra sink => {} ", e);
+ }
+ });
+ }
+
+ //wait until the first message has been dispatched and invoked
+ Thread.sleep(500);
+
+ casSinkFunc.close();
+
+ threadPool.shutdown();
+ threadPool.awaitTermination(10, TimeUnit.SECONDS);
+
+ //Final pending updates should be zero
+ Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+ }
+
+ /**
+ * Test ensures that CassandraSinkBase would flush all in-flight
message when checkpoint performs, accompanied
+ * with concurrent message delivery successfully via a thraedpool.
+ */
+ @Test
+ public void testFlushOnPendingRecordsOnCheckpoint() throws Exception {
+ ClusterBuilder builder = mock(ClusterBuilder.class);
+
+ ExecutorService threadPool =
Executors.newFixedThreadPool(MAX_THREAD_POOL_SIZE);
+ MockCassandraSinkBase casSinkFunc = new
MockCassandraSinkBase<>(builder);
+ casSinkFunc.setFlushOnCheckpoint(true);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(casSinkFunc));
+
+ testHarness.open();
+
+ casSinkFunc.open(new Configuration());
+
+ for (int i = 0; i < MAX_THREAD_NUM; i++) {
+ threadPool.submit(() -> {
+ try {
+
+
casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE,
PredeterminedResult.DELAYED_SUCCESS);
+ } catch (Exception e) {
+ LOG.error("Error while dispatching
sending message to Cassandra sink => {} ", e);
+ }
+ });
+ Thread.sleep(500);
+ }
+
+ //wait until the first message has been dispatched and invoked
+ Thread.sleep(500);
+
+ testHarness.snapshot(123L, 123L);
+
+ threadPool.shutdown();
+ threadPool.awaitTermination(10, TimeUnit.SECONDS);
+
+ Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+
+ casSinkFunc.close();
+ }
+
+ /**
+ * Test ensures that CassandraSinkBase would NOT flush all in-flight
message when checkpoint performs.
+ */
+ @Test
+ public void testDoNotFlushOnPendingRecordsOnCheckpoint() throws
Exception {
+ ClusterBuilder builder = mock(ClusterBuilder.class);
+
+ ExecutorService threadPool =
Executors.newFixedThreadPool(MAX_THREAD_POOL_SIZE);
+ MockCassandraSinkBase casSinkFunc = new
MockCassandraSinkBase<>(builder);
+ casSinkFunc.setFlushOnCheckpoint(false);
+
+ OneInputStreamOperatorTestHarness<String, Object> testHarness =
+ new OneInputStreamOperatorTestHarness<>(new
StreamSink<>(casSinkFunc));
+
+ testHarness.open();
+
+ casSinkFunc.open(new Configuration());
+
+ for (int i = 0; i < MAX_THREAD_NUM; i++) {
+ threadPool.submit(() -> {
+ try {
+
+
casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE,
PredeterminedResult.DELAYED_SUCCESS);
+ } catch (Exception e) {
+ LOG.error("Error while dispatching
sending message to Cassandra sink => {} ", e);
+ }
+ });
+ Thread.sleep(500);
+ }
+
+ //wait until the first message has been dispatched and invoked
+ Thread.sleep(500);
+
+ testHarness.snapshot(123L, 123L);
+ //Final pending records # > 0
+ Assert.assertTrue(casSinkFunc.getNumOfPendingRecords() > 0);
+
+ threadPool.shutdown();
+ threadPool.awaitTermination(10, TimeUnit.SECONDS);
+
+ casSinkFunc.close();
+ }
+
+ /**
+ * Test ensures that CassandraSinkBase would flush all in-flight
message on close(), accompanied with concurrent
+ * thread dispatched message failure via a thraedpool.
+ */
+ @Test
+ public void testFlushOnPendingRecordsOnCloseWithFailedMessage() throws
Exception {
+ ClusterBuilder builder = mock(ClusterBuilder.class);
+
+ ExecutorService threadPool =
Executors.newFixedThreadPool(MAX_THREAD_POOL_SIZE);
+ MockCassandraSinkBase casSinkFunc = new
MockCassandraSinkBase<>(builder);
+ casSinkFunc.setFlushOnCheckpoint(true);
+
+ casSinkFunc.open(new Configuration());
+ try {
+ for (int i = 0; i < MAX_THREAD_NUM; i++) {
+ threadPool.submit(() -> {
+ try {
+
+
casSinkFunc.invokeWithImmediateResult(DUMMY_MESSAGE,
PredeterminedResult.DELAYED_FAILURE);
+ } catch (Exception e) {
+ LOG.error("Error while
dispatching sending message to Cassandra sink => {} ", e);
+ }
+
+ });
+ }
+ //wait until the first message has been dispatched and
invoked
+ Thread.sleep(500);
+
+ casSinkFunc.close();
+ } catch (IOException e) {
+ //expected async error from close()
+ } finally {
+ //wait for all message dispatching threads to end
+ threadPool.shutdown();
+ threadPool.awaitTermination(10, TimeUnit.SECONDS);
+ }
+
+ //Final pending updates should be zero
+ Assert.assertEquals(0, casSinkFunc.getNumOfPendingRecords());
+ }
+
+ ////////////////////////////////
+ // Utilities
+ ///////////////////////////////
+
+ private enum PredeterminedResult {
+ IMMEDIATE_SUCCESS,
+ IMMEDIATE_FAILURE,
+ IMMEDIATE_CANCELLATION,
+ DELAYED_SUCCESS,
+ DELAYED_FAILURE
+ }
+
+ private static class DummyCassandraSinkBase<IN, V> extends
CassandraSinkBase<IN, V> {
+
+ @SuppressWarnings("unchecked")
+ DummyCassandraSinkBase(ClusterBuilder clusterBuilder) {
+ super(clusterBuilder);
+ }
+
+ @Override
+ public ListenableFuture<V> send(IN value) {
+ return (ListenableFuture<V>)
session.executeAsync(DUMMY_QUERY_STMT);
+ }
+
+ }
+
+ private static class MockCassandraSinkBase<IN> extends
CassandraSinkBase<IN, ResultSet> {
+
+ @SuppressWarnings("unchecked")
+ MockCassandraSinkBase(ClusterBuilder clusterBuilder) {
+ super(clusterBuilder);
+
+ cluster = mock(Cluster.class);
+ session = mock(Session.class);
+ when(builder.getCluster()).thenReturn(cluster);
+ when(cluster.connect()).thenReturn(session);
+ }
+
+ public void invokeWithImmediateResult(IN value,
PredeterminedResult result) throws Exception {
+
when(session.executeAsync(DUMMY_QUERY_STMT)).thenAnswer(new
Answer<ResultSetFuture>() {
+ @Override
+ public ResultSetFuture
answer(InvocationOnMock invocationOnMock) throws Throwable {
+ ResultSetFuture
predeterminedFuture = null;
+
+ switch (result) {
+ case IMMEDIATE_FAILURE:
+
predeterminedFuture = ResultSetFutures.immediateFailedFuture(new
IllegalStateException("Immediate Failure!"));
+ break;
+
+ case
IMMEDIATE_CANCELLATION:
+
predeterminedFuture = ResultSetFutures.immediateCancelledFuture();
+ break;
+
+ case DELAYED_FAILURE:
+
predeterminedFuture = ResultSetFutures.delayedFailedFuture(new
IllegalStateException("Delayed Failure!"));
+ break;
+
+ case DELAYED_SUCCESS:
+
predeterminedFuture = ResultSetFutures.delayedFuture(null);
+ break;
+ //If not specified, set
result to Successful
+ default:
+ case IMMEDIATE_SUCCESS:
+
predeterminedFuture = ResultSetFutures.immediateFuture(null);
+ break;
+ }
+
+ log.info("Invoke with {} of
{}", value, result.name());
+
+ return predeterminedFuture;
+ }
+ }
+ );
+ invoke(value);
+ }
+
+ @Override
+ public ListenableFuture<ResultSet> send(IN value) {
+ return (ListenableFuture<ResultSet>)
session.executeAsync(DUMMY_QUERY_STMT);
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
--- End diff --
remove
---