This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-cassandra.git


The following commit(s) were added to refs/heads/main by this push:
     new dbb4ceb  [FLINK-38034] Added WriteAheadSinkTestBase to Cassandra Repo
dbb4ceb is described below

commit dbb4ceb2782e6e05fba557c83e87be1009d988ec
Author: Poorvank <poorv...@uber.com>
AuthorDate: Mon Jul 21 14:59:44 2025 +0530

    [FLINK-38034] Added WriteAheadSinkTestBase to Cassandra Repo
---
 .../cassandra/CassandraConnectorITCase.java        |   1 -
 .../cassandra/WriteAheadSinkTestBase.java          | 329 +++++++++++++++++++++
 2 files changed, 329 insertions(+), 1 deletion(-)

diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index f12e595..fd5925f 100644
--- 
a/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
-import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.testutils.junit.extensions.retry.RetryExtension;
diff --git 
a/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/WriteAheadSinkTestBase.java
 
b/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/WriteAheadSinkTestBase.java
new file mode 100644
index 0000000..c726b5d
--- /dev/null
+++ 
b/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/WriteAheadSinkTestBase.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import org.junit.Test;
+
+/**
+ * NOTE: This is a local copy of {@link
+ * org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase} from 
Flink's repo.
+ *
+ * <p>During Flink's JUnit 5 migration, WriteAheadSinkTestBase became 
package-private in
+ * flink-streaming-java, breaking external connectors that extended it for 
testing. (FLINK-25544)
+ *
+ * <p>This copy allows the Cassandra connector to continue testing write-ahead 
sink functionality
+ * without depending on Flink's internal test classes.
+ *
+ * <p>See FLINK-38034 for more details.
+ */
+public abstract class WriteAheadSinkTestBase<IN, S extends 
GenericWriteAheadSink<IN>> {
+
+    protected abstract S createSink() throws Exception;
+
+    protected abstract TypeInformation<IN> createTypeInfo();
+
+    protected abstract IN generateValue(int counter, int checkpointID);
+
+    protected abstract void verifyResultsIdealCircumstances(S sink) throws 
Exception;
+
+    protected abstract void verifyResultsDataPersistenceUponMissedNotify(S 
sink) throws Exception;
+
+    protected abstract void verifyResultsDataDiscardingUponRestore(S sink) 
throws Exception;
+
+    protected abstract void verifyResultsWhenReScaling(
+            S sink, int startElementCounter, int endElementCounter) throws 
Exception;
+
+    private final int maxParallelism = 10;
+
+    @Test
+    public void testIdealCircumstances() throws Exception {
+        S sink = createSink();
+
+        OneInputStreamOperatorTestHarness<IN, IN> testHarness =
+                new OneInputStreamOperatorTestHarness<>(sink);
+
+        testHarness.open();
+
+        int elementCounter = 1;
+        int snapshotCount = 0;
+
+        for (int x = 0; x < 20; x++) {
+            testHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+            elementCounter++;
+        }
+
+        testHarness.snapshot(snapshotCount++, 0);
+        testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
+
+        for (int x = 0; x < 20; x++) {
+            testHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 1)));
+            elementCounter++;
+        }
+
+        testHarness.snapshot(snapshotCount++, 0);
+        testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
+
+        for (int x = 0; x < 20; x++) {
+            testHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 2)));
+            elementCounter++;
+        }
+
+        testHarness.snapshot(snapshotCount++, 0);
+        testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
+
+        verifyResultsIdealCircumstances(sink);
+    }
+
+    @Test
+    public void testDataPersistenceUponMissedNotify() throws Exception {
+        S sink = createSink();
+
+        OneInputStreamOperatorTestHarness<IN, IN> testHarness =
+                new OneInputStreamOperatorTestHarness<>(sink);
+
+        testHarness.open();
+
+        int elementCounter = 1;
+        int snapshotCount = 0;
+
+        for (int x = 0; x < 20; x++) {
+            testHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+            elementCounter++;
+        }
+
+        testHarness.snapshot(snapshotCount++, 0);
+        testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
+
+        for (int x = 0; x < 20; x++) {
+            testHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 1)));
+            elementCounter++;
+        }
+
+        testHarness.snapshot(snapshotCount++, 0);
+
+        for (int x = 0; x < 20; x++) {
+            testHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 2)));
+            elementCounter++;
+        }
+
+        testHarness.snapshot(snapshotCount++, 0);
+        testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
+
+        verifyResultsDataPersistenceUponMissedNotify(sink);
+    }
+
+    @Test
+    public void testDataDiscardingUponRestore() throws Exception {
+        S sink = createSink();
+
+        OneInputStreamOperatorTestHarness<IN, IN> testHarness =
+                new OneInputStreamOperatorTestHarness<>(sink);
+
+        testHarness.open();
+
+        int elementCounter = 1;
+        int snapshotCount = 0;
+
+        for (int x = 0; x < 20; x++) {
+            testHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+            elementCounter++;
+        }
+
+        OperatorSubtaskState latestSnapshot = 
testHarness.snapshot(snapshotCount++, 0);
+        testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
+
+        for (int x = 0; x < 20; x++) {
+            testHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 1)));
+            elementCounter++;
+        }
+
+        testHarness.close();
+
+        sink = createSink();
+
+        testHarness = new OneInputStreamOperatorTestHarness<>(sink);
+
+        testHarness.setup();
+        testHarness.initializeState(latestSnapshot);
+        testHarness.open();
+
+        for (int x = 0; x < 20; x++) {
+            testHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 2)));
+            elementCounter++;
+        }
+
+        testHarness.snapshot(snapshotCount++, 0);
+        testHarness.notifyOfCompletedCheckpoint(snapshotCount - 1);
+
+        verifyResultsDataDiscardingUponRestore(sink);
+    }
+
+    @Test
+    public void testScalingDown() throws Exception {
+        S sink1 = createSink();
+        OneInputStreamOperatorTestHarness<IN, IN> testHarness1 =
+                new OneInputStreamOperatorTestHarness<>(sink1, maxParallelism, 
2, 0);
+        testHarness1.open();
+
+        S sink2 = createSink();
+        OneInputStreamOperatorTestHarness<IN, IN> testHarness2 =
+                new OneInputStreamOperatorTestHarness<>(sink2, maxParallelism, 
2, 1);
+        testHarness2.open();
+
+        int elementCounter = 1;
+        int snapshotCount = 0;
+
+        for (int x = 0; x < 10; x++) {
+            testHarness1.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+            elementCounter++;
+        }
+
+        for (int x = 0; x < 11; x++) {
+            testHarness2.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+            elementCounter++;
+        }
+
+        // snapshot at checkpoint 0 for testHarness1 and testHarness 2
+        OperatorSubtaskState snapshot1 = testHarness1.snapshot(snapshotCount, 
0);
+        OperatorSubtaskState snapshot2 = testHarness2.snapshot(snapshotCount, 
0);
+
+        // merge the two partial states
+        OperatorSubtaskState mergedSnapshot =
+                AbstractStreamOperatorTestHarness.repackageState(snapshot1, 
snapshot2);
+
+        testHarness1.close();
+        testHarness2.close();
+
+        // and create a third instance that operates alone but
+        // has the merged state of the previous 2 instances
+
+        OperatorSubtaskState initState =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        mergedSnapshot, maxParallelism, 2, 1, 0);
+
+        S sink3 = createSink();
+        OneInputStreamOperatorTestHarness<IN, IN> mergedTestHarness =
+                new OneInputStreamOperatorTestHarness<>(sink3, maxParallelism, 
1, 0);
+
+        mergedTestHarness.setup();
+        mergedTestHarness.initializeState(initState);
+        mergedTestHarness.open();
+
+        for (int x = 0; x < 12; x++) {
+            mergedTestHarness.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+            elementCounter++;
+        }
+
+        snapshotCount++;
+        mergedTestHarness.snapshot(snapshotCount, 1);
+        mergedTestHarness.notifyOfCompletedCheckpoint(snapshotCount);
+
+        verifyResultsWhenReScaling(sink3, 1, 33);
+        mergedTestHarness.close();
+    }
+
+    @Test
+    public void testScalingUp() throws Exception {
+
+        S sink1 = createSink();
+        OneInputStreamOperatorTestHarness<IN, IN> testHarness1 =
+                new OneInputStreamOperatorTestHarness<>(sink1, maxParallelism, 
1, 0);
+
+        int elementCounter = 1;
+        int snapshotCount = 0;
+
+        testHarness1.open();
+
+        // put two more checkpoints as pending
+
+        for (int x = 0; x < 10; x++) {
+            testHarness1.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+            elementCounter++;
+        }
+        testHarness1.snapshot(++snapshotCount, 0);
+
+        for (int x = 0; x < 11; x++) {
+            testHarness1.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+            elementCounter++;
+        }
+
+        // this will be the state that will be split between the two new 
operators
+        OperatorSubtaskState snapshot = testHarness1.snapshot(++snapshotCount, 
0);
+
+        testHarness1.close();
+
+        // verify no elements are in the sink
+        verifyResultsWhenReScaling(sink1, 0, -1);
+
+        // we will create two operator instances, testHarness2 and 
testHarness3,
+        // that will share the state of testHarness1
+
+        OperatorSubtaskState initState1 =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        snapshot, maxParallelism, 1, 2, 0);
+
+        OperatorSubtaskState initState2 =
+                AbstractStreamOperatorTestHarness.repartitionOperatorState(
+                        snapshot, maxParallelism, 1, 2, 1);
+
+        ++snapshotCount;
+
+        S sink2 = createSink();
+        OneInputStreamOperatorTestHarness<IN, IN> testHarness2 =
+                new OneInputStreamOperatorTestHarness<>(sink2, maxParallelism, 
2, 0);
+
+        testHarness2.setup();
+        testHarness2.initializeState(initState1);
+        testHarness2.open();
+
+        testHarness2.notifyOfCompletedCheckpoint(snapshotCount);
+
+        verifyResultsWhenReScaling(sink2, 1, 10);
+
+        S sink3 = createSink();
+        OneInputStreamOperatorTestHarness<IN, IN> testHarness3 =
+                new OneInputStreamOperatorTestHarness<>(sink3, maxParallelism, 
2, 1);
+
+        testHarness3.setup();
+        testHarness3.initializeState(initState2);
+        testHarness3.open();
+
+        // add some more elements to verify that everything functions normally 
from now on...
+
+        for (int x = 0; x < 10; x++) {
+            testHarness3.processElement(new 
StreamRecord<>(generateValue(elementCounter, 0)));
+            elementCounter++;
+        }
+
+        testHarness3.snapshot(snapshotCount, 1);
+        testHarness3.notifyOfCompletedCheckpoint(snapshotCount);
+
+        verifyResultsWhenReScaling(sink3, 11, 31);
+
+        testHarness2.close();
+        testHarness3.close();
+    }
+}

Reply via email to