gaoyunhaii commented on a change in pull request #13678: URL: https://github.com/apache/flink/pull/13678#discussion_r507612814
########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterStateTest.java ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; + +/** + * Test {@link StreamingCommitterState}. + */ +public class StreamingCommitterStateTest { + + @Test + public void constructFromMap() { + final NavigableMap<Long, List<Integer>> r = new TreeMap<>(); + final List<Integer> expectedList = Arrays.asList( + 0, Review comment: I'm more tend to make the numbers in one line. ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterStateSerializerTest.java ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; + +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Test {@link StreamingCommitterStateSerializer}. + */ +public class StreamingCommitterStateSerializerTest { + + @Test + public void serializeNonEmptyState() throws IOException { + final StreamingCommitterState<String> expectedStreamingCommitterState = + new StreamingCommitterState<>(Arrays.asList("city", "great", "temper", "valley")); + final StreamingCommitterStateSerializer<String> streamingCommitterStateSerializer = + new StreamingCommitterStateSerializer<>(SimpleVersionedStringSerializer.INSTANCE); + + final byte[] serialize = streamingCommitterStateSerializer.serialize( + expectedStreamingCommitterState); + final StreamingCommitterState<String> streamingCommitterState = + streamingCommitterStateSerializer.deserialize( + streamingCommitterStateSerializer.getVersion(), + serialize); + + assertThat( + streamingCommitterState.getCommittables(), + equalTo( + expectedStreamingCommitterState.getCommittables())); + } + + @Test + public void serializeEmptyState() throws IOException { + final StreamingCommitterState<String> expectedStreamingCommitterState = + new StreamingCommitterState<>(Collections.emptyList()); + final StreamingCommitterStateSerializer<String> streamingCommitterStateSerializer = + new StreamingCommitterStateSerializer<>(SimpleVersionedStringSerializer.INSTANCE); + + final byte[] serialize = streamingCommitterStateSerializer.serialize( + expectedStreamingCommitterState); + final StreamingCommitterState<String> streamingCommitterState = + streamingCommitterStateSerializer.deserialize( + streamingCommitterStateSerializer.getVersion(), + serialize); + + assertThat( Review comment: Simplified to `assertEquals(expectedStreamingCommitterState, streamingCommitterState);` ? ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterStateTest.java ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; + +/** + * Test {@link StreamingCommitterState}. + */ +public class StreamingCommitterStateTest { + + @Test + public void constructFromMap() { + final NavigableMap<Long, List<Integer>> r = new TreeMap<>(); + final List<Integer> expectedList = Arrays.asList( + 0, + 1, + 2, + 3, + 10, + 11, + 12, + 13, + 30, + 31, + 32, + 33); + + r.put(1L, Arrays.asList(10, 11, 12, 13)); + r.put(0L, Arrays.asList(0, 1, 2, 3)); + r.put(3L, Arrays.asList(30, 31, 32, 33)); + + final StreamingCommitterState<Integer> streamingCommitterState = new StreamingCommitterState<>( + r); + + Assert.assertArrayEquals( Review comment: May be simplified as `Assert.assertEquals(expectedList, streamingCommitterState.getCommittables());` ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterTestBase.java ########## @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionUtils; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * Base class for Tests for subclasses of {@link AbstractWriterOperator}. + */ +public abstract class StreamingCommitterTestBase extends TestLogger { + + private static final TestSink.DefaultWriter<String> DEFAULT_WRITER = new TestSink.DefaultWriter<>(); + + @Test(expected = IllegalArgumentException.class) + public void throwExceptionWithoutSerializer() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutSerializer(); + testHarness.initializeEmptyState(); + testHarness.open(); + } + + @Test(expected = IllegalArgumentException.class) + public void throwExceptionWithoutCommitter() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutCommitter(); + testHarness.initializeEmptyState(); + testHarness.open(); + } + + @Test(expected = UnsupportedOperationException.class) + public void doNotSupportRetry() throws Exception { + final List<String> input = Arrays.asList("lazy", "leaf"); + final OneInputStreamOperatorTestHarness<String, String> testHarness = createAlwaysRetryTestHarness(); + + testHarness.initializeEmptyState(); + testHarness.open(); + + final List<String> expectedCommittables = getExpectedCommittables(input, testHarness); + + process(testHarness, input); + snapshot(testHarness, 1, 1, expectedCommittables); + completeCheckpoint(testHarness, 1, expectedCommittables); + + testHarness.close(); + } + + @Test + public void closeCommitter() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + testHarness.initializeEmptyState(); + testHarness.open(); + testHarness.close(); + assertThat( Review comment: Might simplfied to `assertTrue`. ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterTestBase.java ########## @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionUtils; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * Base class for Tests for subclasses of {@link AbstractWriterOperator}. + */ +public abstract class StreamingCommitterTestBase extends TestLogger { + + private static final TestSink.DefaultWriter<String> DEFAULT_WRITER = new TestSink.DefaultWriter<>(); + + @Test(expected = IllegalArgumentException.class) + public void throwExceptionWithoutSerializer() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutSerializer(); + testHarness.initializeEmptyState(); + testHarness.open(); + } + + @Test(expected = IllegalArgumentException.class) + public void throwExceptionWithoutCommitter() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutCommitter(); + testHarness.initializeEmptyState(); + testHarness.open(); + } + + @Test(expected = UnsupportedOperationException.class) + public void doNotSupportRetry() throws Exception { + final List<String> input = Arrays.asList("lazy", "leaf"); + final OneInputStreamOperatorTestHarness<String, String> testHarness = createAlwaysRetryTestHarness(); + + testHarness.initializeEmptyState(); + testHarness.open(); + + final List<String> expectedCommittables = getExpectedCommittables(input, testHarness); + + process(testHarness, input); + snapshot(testHarness, 1, 1, expectedCommittables); + completeCheckpoint(testHarness, 1, expectedCommittables); + + testHarness.close(); + } + + @Test + public void closeCommitter() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + testHarness.initializeEmptyState(); + testHarness.open(); + testHarness.close(); + assertThat( + getCommitter(testHarness).isClosed(), + is(true)); + } + + @Test + public void restoredFromMergedState() throws Exception { + Review comment: Remove the empty line ? ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterStateSerializerTest.java ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; + +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Test {@link StreamingCommitterStateSerializer}. + */ +public class StreamingCommitterStateSerializerTest { + + @Test + public void serializeNonEmptyState() throws IOException { + final StreamingCommitterState<String> expectedStreamingCommitterState = + new StreamingCommitterState<>(Arrays.asList("city", "great", "temper", "valley")); + final StreamingCommitterStateSerializer<String> streamingCommitterStateSerializer = + new StreamingCommitterStateSerializer<>(SimpleVersionedStringSerializer.INSTANCE); + + final byte[] serialize = streamingCommitterStateSerializer.serialize( + expectedStreamingCommitterState); + final StreamingCommitterState<String> streamingCommitterState = + streamingCommitterStateSerializer.deserialize( + streamingCommitterStateSerializer.getVersion(), + serialize); + + assertThat( Review comment: Simplified to `assertEquals(expectedStreamingCommitterState, streamingCommitterState);` ? ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterTestBase.java ########## @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionUtils; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * Base class for Tests for subclasses of {@link AbstractWriterOperator}. + */ +public abstract class StreamingCommitterTestBase extends TestLogger { + + private static final TestSink.DefaultWriter<String> DEFAULT_WRITER = new TestSink.DefaultWriter<>(); + + @Test(expected = IllegalArgumentException.class) + public void throwExceptionWithoutSerializer() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutSerializer(); + testHarness.initializeEmptyState(); + testHarness.open(); + } + + @Test(expected = IllegalArgumentException.class) + public void throwExceptionWithoutCommitter() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutCommitter(); + testHarness.initializeEmptyState(); + testHarness.open(); + } + + @Test(expected = UnsupportedOperationException.class) + public void doNotSupportRetry() throws Exception { + final List<String> input = Arrays.asList("lazy", "leaf"); + final OneInputStreamOperatorTestHarness<String, String> testHarness = createAlwaysRetryTestHarness(); + + testHarness.initializeEmptyState(); + testHarness.open(); + + final List<String> expectedCommittables = getExpectedCommittables(input, testHarness); + + process(testHarness, input); + snapshot(testHarness, 1, 1, expectedCommittables); + completeCheckpoint(testHarness, 1, expectedCommittables); + + testHarness.close(); + } + + @Test + public void closeCommitter() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + testHarness.initializeEmptyState(); + testHarness.open(); + testHarness.close(); + assertThat( + getCommitter(testHarness).isClosed(), + is(true)); + } + + @Test + public void restoredFromMergedState() throws Exception { + + final List<String> input1 = Arrays.asList("today", "whom"); + final OperatorSubtaskState operatorSubtaskState1 = buildSubtaskState(input1); + + final List<String> input2 = Arrays.asList("future", "evil", "how"); + final OperatorSubtaskState operatorSubtaskState2 = buildSubtaskState(input2); + + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + + final OperatorSubtaskState mergedOperatorSubtaskState = + OneInputStreamOperatorTestHarness.repackageState( + operatorSubtaskState1, + operatorSubtaskState2); + + testHarness.initializeState(mergedOperatorSubtaskState); + + final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator = + (AbstractStreamingCommitterOperator<String, String>) testHarness.getOneInputOperator(); + + final List<String> expectedStates = new ArrayList<>(); + expectedStates.addAll(getExpectedCommittables(input1, testHarness)); + expectedStates.addAll(getExpectedCommittables(input2, testHarness)); + assertThat( + expectedStates, + equalTo( + streamingCommitterOperator.getCurrentCommittables())); + testHarness.close(); + } + + @Test + public void commitMultipleStagesTogether() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + testHarness.initializeEmptyState(); + testHarness.open(); + + final List<String> expectedOutput = new ArrayList<>(); + expectedOutput.addAll(preCommit(testHarness, 1L, Arrays.asList("cautious", "nature"))); + expectedOutput.addAll(preCommit(testHarness, 2L, Arrays.asList("cautious", "nature"))); + expectedOutput.addAll(preCommit(testHarness, 3L, Arrays.asList("lawyer", "grammar"))); + + completeCheckpoint(testHarness, 3, expectedOutput); + + testHarness.close(); + } + + /** + * Process the input and return the snapshot. + */ + protected OperatorSubtaskState buildSubtaskState(List<String> input) throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + testHarness.initializeEmptyState(); + testHarness.open(); + + final List<String> committables = getExpectedCommittables(input, testHarness); + process(testHarness, input); + final OperatorSubtaskState operatorSubtaskState = snapshot(testHarness, 1, 1, committables); + testHarness.close(); + + return operatorSubtaskState; + } + + private List<String> getExpectedCommittables( + List<String> committables, + OneInputStreamOperatorTestHarness<String, String> testHarness) { + final AbstractTestCommitter<String> committer = getCommitter(testHarness); + if (committer instanceof GlobalCommitter) { + final GlobalCommitter<String, String> globalCommitter = (GlobalCommitter<String, String>) committer; + return Collections.singletonList(globalCommitter.combine(committables)); + } + return committables; + } + + // ------------------------ The sub class should override or implement following method ---------------------------- + + abstract AbstractStreamingCommitterOperatorFactory<String, String> createStreamingCommitterOperator( + TestSink<?, String, String, String> sink); + + GlobalCommitter<String, String> createGlobalCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + Committer<String> createCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + GlobalCommitter<String, String> createAlwaysRetryGlobalCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + Committer<String> createAlwaysRetryCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + // -------------------------------- OneInputStreamOperatorTestHarness Factory -------------------------------------- + + protected OneInputStreamOperatorTestHarness<String, String> createTestHarness() throws Exception { + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.of(createGlobalCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + } + + protected OneInputStreamOperatorTestHarness<String, String> createTestHarness( + OperatorSubtaskState operatorSubtaskState) throws Exception { + + final OneInputStreamOperatorTestHarness<String, String> operatorTestHarness = + new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.of(createGlobalCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + operatorTestHarness.initializeState(operatorSubtaskState); + return operatorTestHarness; + } + + OneInputStreamOperatorTestHarness<String, String> createAlwaysRetryTestHarness() throws Exception { + + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createAlwaysRetryCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.of(createAlwaysRetryGlobalCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + } + + OneInputStreamOperatorTestHarness<String, String> createWithoutSerializer() throws Exception { + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createCommitter()), + () -> Optional.empty(), + () -> Optional.of(createGlobalCommitter()), + () -> Optional.empty())), StringSerializer.INSTANCE); + } + + OneInputStreamOperatorTestHarness<String, String> createWithoutCommitter() throws Exception { + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.empty(), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.empty(), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + } + + /** + * Base class for out testing {@link Committer} and {@link org.apache.flink.api.connector.sink.GlobalCommitter}. + */ + abstract static class AbstractTestCommitter<CommT> implements Committer<CommT> { + + protected List<CommT> committedData; + + private boolean isClosed; + + public AbstractTestCommitter() { + this.committedData = new ArrayList<>(); + this.isClosed = false; + } + + public List<CommT> getCommittedData() { + return committedData; + } + + @Override + public void close() throws Exception { + isClosed = true; + } + + public boolean isClosed() { + return isClosed; + } + } + + // ------------------------------------------- Utils for testing --------------------------------------------------- + + /** + * Process the input and return the pre-commit results. + */ + private List<String> preCommit( + OneInputStreamOperatorTestHarness<String, String> testHarness, + long checkpointId, + List<String> input) throws Exception { + + process(testHarness, input); + final List<String> expectedCommittables = getExpectedCommittables(input, testHarness); + snapshot(testHarness, checkpointId, 1L, expectedCommittables); + return expectedCommittables; + } + + private <InputT, CommT> void process( + OneInputStreamOperatorTestHarness<InputT, CommT> testHarness, + List<InputT> inputs) { + + final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator = (AbstractStreamingCommitterOperator<String, String>) testHarness + .getOneInputOperator(); + final List<StreamRecord<InputT>> inputStreamRecords = inputs + .stream() + .map(StreamRecord::new) + .collect(Collectors.toList()); + + // verify currently received committables + inputStreamRecords.forEach(FunctionUtils.uncheckedConsumer(testHarness::processElement)); + + Assert.assertArrayEquals( + inputs.toArray(), + streamingCommitterOperator.getCurrentInputs().toArray()); + } + + <InputT, CommT> OperatorSubtaskState snapshot( + OneInputStreamOperatorTestHarness<InputT, CommT> testHarness, + long checkpointId, + long checkpointTimestamp, + List<CommT> expectedCommittables) throws Exception { + + final AbstractStreamingCommitterOperator<InputT, CommT> streamingCommitterOperator = + (AbstractStreamingCommitterOperator<InputT, CommT>) testHarness.getOneInputOperator(); + + final List<CommT> expectedState = + streamingCommitterOperator.getCommittablesPerCheckpoint().values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + expectedState.addAll(expectedCommittables); + + final NavigableMap<Long, List<CommT>> expectedCommittablesPerCheckpoint = new TreeMap<>( + streamingCommitterOperator.getCommittablesPerCheckpoint()); + final List<StreamingCommitterState<CommT>> expectedStreamingCommitterState = Collections.singletonList( + new StreamingCommitterState<>(expectedState)); + + expectedCommittablesPerCheckpoint.put( + checkpointId, + streamingCommitterOperator.getCurrentCommittables()); + + final OperatorSubtaskState operatorSubtaskState = testHarness.snapshot( + checkpointId, + checkpointTimestamp); + + // verify current committables are cleaned up after checkpointing + assertThat( + streamingCommitterOperator.getCurrentInputs(), + is( + empty())); + + // verify the state + final List<StreamingCommitterState<CommT>> operatorState = + CollectionUtil.iterableToList(streamingCommitterOperator.getState().get()); + + assertThat( + operatorState, + equalTo( + expectedStreamingCommitterState)); + + // verify committables per checkpoint + assertThat( + streamingCommitterOperator.getCommittablesPerCheckpoint(), + equalTo( + expectedCommittablesPerCheckpoint)); + return operatorSubtaskState; + } + + <InputT, CommT> void completeCheckpoint( + OneInputStreamOperatorTestHarness<InputT, CommT> testHarness, + long checkpoint, + List<CommT> expectedOutput) throws Exception { + + final AbstractStreamingCommitterOperator<InputT, CommT> streamingCommitterOperator = + (AbstractStreamingCommitterOperator<InputT, CommT>) testHarness.getOneInputOperator(); + + final NavigableMap<Long, List<CommT>> expectedCommittablesPerCheckpoint = + new TreeMap<>(streamingCommitterOperator.getCommittablesPerCheckpoint()); + + expectedCommittablesPerCheckpoint.entrySet().removeIf(it -> it.getKey() <= checkpoint); + final List<CommT> sinkOutput = getCommitter(testHarness).getCommittedData(); + + testHarness.notifyOfCompletedCheckpoint(checkpoint); + + // verify committables per checkpoint + assertThat( Review comment: Similarly might change to `assertEquals` for these three asserts ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterTestBase.java ########## @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionUtils; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * Base class for Tests for subclasses of {@link AbstractWriterOperator}. + */ +public abstract class StreamingCommitterTestBase extends TestLogger { + + private static final TestSink.DefaultWriter<String> DEFAULT_WRITER = new TestSink.DefaultWriter<>(); + + @Test(expected = IllegalArgumentException.class) + public void throwExceptionWithoutSerializer() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutSerializer(); + testHarness.initializeEmptyState(); + testHarness.open(); + } + + @Test(expected = IllegalArgumentException.class) + public void throwExceptionWithoutCommitter() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutCommitter(); + testHarness.initializeEmptyState(); + testHarness.open(); + } + + @Test(expected = UnsupportedOperationException.class) + public void doNotSupportRetry() throws Exception { + final List<String> input = Arrays.asList("lazy", "leaf"); + final OneInputStreamOperatorTestHarness<String, String> testHarness = createAlwaysRetryTestHarness(); + + testHarness.initializeEmptyState(); + testHarness.open(); + + final List<String> expectedCommittables = getExpectedCommittables(input, testHarness); + + process(testHarness, input); + snapshot(testHarness, 1, 1, expectedCommittables); + completeCheckpoint(testHarness, 1, expectedCommittables); + + testHarness.close(); + } + + @Test + public void closeCommitter() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + testHarness.initializeEmptyState(); + testHarness.open(); + testHarness.close(); + assertThat( + getCommitter(testHarness).isClosed(), + is(true)); + } + + @Test + public void restoredFromMergedState() throws Exception { + + final List<String> input1 = Arrays.asList("today", "whom"); + final OperatorSubtaskState operatorSubtaskState1 = buildSubtaskState(input1); + + final List<String> input2 = Arrays.asList("future", "evil", "how"); + final OperatorSubtaskState operatorSubtaskState2 = buildSubtaskState(input2); + + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + + final OperatorSubtaskState mergedOperatorSubtaskState = + OneInputStreamOperatorTestHarness.repackageState( + operatorSubtaskState1, + operatorSubtaskState2); + + testHarness.initializeState(mergedOperatorSubtaskState); + + final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator = + (AbstractStreamingCommitterOperator<String, String>) testHarness.getOneInputOperator(); + + final List<String> expectedStates = new ArrayList<>(); + expectedStates.addAll(getExpectedCommittables(input1, testHarness)); + expectedStates.addAll(getExpectedCommittables(input2, testHarness)); + assertThat( + expectedStates, + equalTo( + streamingCommitterOperator.getCurrentCommittables())); + testHarness.close(); + } + + @Test + public void commitMultipleStagesTogether() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + testHarness.initializeEmptyState(); + testHarness.open(); + + final List<String> expectedOutput = new ArrayList<>(); + expectedOutput.addAll(preCommit(testHarness, 1L, Arrays.asList("cautious", "nature"))); + expectedOutput.addAll(preCommit(testHarness, 2L, Arrays.asList("cautious", "nature"))); + expectedOutput.addAll(preCommit(testHarness, 3L, Arrays.asList("lawyer", "grammar"))); + + completeCheckpoint(testHarness, 3, expectedOutput); + + testHarness.close(); + } + + /** + * Process the input and return the snapshot. + */ + protected OperatorSubtaskState buildSubtaskState(List<String> input) throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + testHarness.initializeEmptyState(); + testHarness.open(); + + final List<String> committables = getExpectedCommittables(input, testHarness); + process(testHarness, input); + final OperatorSubtaskState operatorSubtaskState = snapshot(testHarness, 1, 1, committables); + testHarness.close(); + + return operatorSubtaskState; + } + + private List<String> getExpectedCommittables( + List<String> committables, + OneInputStreamOperatorTestHarness<String, String> testHarness) { + final AbstractTestCommitter<String> committer = getCommitter(testHarness); + if (committer instanceof GlobalCommitter) { + final GlobalCommitter<String, String> globalCommitter = (GlobalCommitter<String, String>) committer; + return Collections.singletonList(globalCommitter.combine(committables)); + } + return committables; + } + + // ------------------------ The sub class should override or implement following method ---------------------------- + + abstract AbstractStreamingCommitterOperatorFactory<String, String> createStreamingCommitterOperator( + TestSink<?, String, String, String> sink); + + GlobalCommitter<String, String> createGlobalCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + Committer<String> createCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + GlobalCommitter<String, String> createAlwaysRetryGlobalCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + Committer<String> createAlwaysRetryCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + // -------------------------------- OneInputStreamOperatorTestHarness Factory -------------------------------------- + + protected OneInputStreamOperatorTestHarness<String, String> createTestHarness() throws Exception { + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.of(createGlobalCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + } + + protected OneInputStreamOperatorTestHarness<String, String> createTestHarness( + OperatorSubtaskState operatorSubtaskState) throws Exception { + + final OneInputStreamOperatorTestHarness<String, String> operatorTestHarness = + new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.of(createGlobalCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + operatorTestHarness.initializeState(operatorSubtaskState); + return operatorTestHarness; + } + + OneInputStreamOperatorTestHarness<String, String> createAlwaysRetryTestHarness() throws Exception { + + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createAlwaysRetryCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.of(createAlwaysRetryGlobalCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + } + + OneInputStreamOperatorTestHarness<String, String> createWithoutSerializer() throws Exception { + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createCommitter()), + () -> Optional.empty(), + () -> Optional.of(createGlobalCommitter()), + () -> Optional.empty())), StringSerializer.INSTANCE); + } + + OneInputStreamOperatorTestHarness<String, String> createWithoutCommitter() throws Exception { + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.empty(), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.empty(), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + } + + /** + * Base class for out testing {@link Committer} and {@link org.apache.flink.api.connector.sink.GlobalCommitter}. + */ + abstract static class AbstractTestCommitter<CommT> implements Committer<CommT> { + + protected List<CommT> committedData; + + private boolean isClosed; + + public AbstractTestCommitter() { + this.committedData = new ArrayList<>(); + this.isClosed = false; + } + + public List<CommT> getCommittedData() { + return committedData; + } + + @Override + public void close() throws Exception { + isClosed = true; + } + + public boolean isClosed() { + return isClosed; + } + } + + // ------------------------------------------- Utils for testing --------------------------------------------------- + + /** + * Process the input and return the pre-commit results. + */ + private List<String> preCommit( + OneInputStreamOperatorTestHarness<String, String> testHarness, + long checkpointId, + List<String> input) throws Exception { + + process(testHarness, input); + final List<String> expectedCommittables = getExpectedCommittables(input, testHarness); + snapshot(testHarness, checkpointId, 1L, expectedCommittables); + return expectedCommittables; + } + + private <InputT, CommT> void process( + OneInputStreamOperatorTestHarness<InputT, CommT> testHarness, + List<InputT> inputs) { + + final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator = (AbstractStreamingCommitterOperator<String, String>) testHarness + .getOneInputOperator(); + final List<StreamRecord<InputT>> inputStreamRecords = inputs + .stream() + .map(StreamRecord::new) + .collect(Collectors.toList()); + + // verify currently received committables + inputStreamRecords.forEach(FunctionUtils.uncheckedConsumer(testHarness::processElement)); + + Assert.assertArrayEquals( Review comment: might change to `assertEquals` ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterTestBase.java ########## @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionUtils; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * Base class for Tests for subclasses of {@link AbstractWriterOperator}. + */ +public abstract class StreamingCommitterTestBase extends TestLogger { + + private static final TestSink.DefaultWriter<String> DEFAULT_WRITER = new TestSink.DefaultWriter<>(); + + @Test(expected = IllegalArgumentException.class) + public void throwExceptionWithoutSerializer() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutSerializer(); + testHarness.initializeEmptyState(); + testHarness.open(); + } + + @Test(expected = IllegalArgumentException.class) + public void throwExceptionWithoutCommitter() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutCommitter(); + testHarness.initializeEmptyState(); + testHarness.open(); + } + + @Test(expected = UnsupportedOperationException.class) + public void doNotSupportRetry() throws Exception { + final List<String> input = Arrays.asList("lazy", "leaf"); + final OneInputStreamOperatorTestHarness<String, String> testHarness = createAlwaysRetryTestHarness(); + + testHarness.initializeEmptyState(); + testHarness.open(); + + final List<String> expectedCommittables = getExpectedCommittables(input, testHarness); + + process(testHarness, input); + snapshot(testHarness, 1, 1, expectedCommittables); + completeCheckpoint(testHarness, 1, expectedCommittables); + + testHarness.close(); + } + + @Test + public void closeCommitter() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + testHarness.initializeEmptyState(); + testHarness.open(); + testHarness.close(); + assertThat( + getCommitter(testHarness).isClosed(), + is(true)); + } + + @Test + public void restoredFromMergedState() throws Exception { + + final List<String> input1 = Arrays.asList("today", "whom"); + final OperatorSubtaskState operatorSubtaskState1 = buildSubtaskState(input1); + + final List<String> input2 = Arrays.asList("future", "evil", "how"); + final OperatorSubtaskState operatorSubtaskState2 = buildSubtaskState(input2); + + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + + final OperatorSubtaskState mergedOperatorSubtaskState = + OneInputStreamOperatorTestHarness.repackageState( + operatorSubtaskState1, + operatorSubtaskState2); + + testHarness.initializeState(mergedOperatorSubtaskState); + + final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator = + (AbstractStreamingCommitterOperator<String, String>) testHarness.getOneInputOperator(); + + final List<String> expectedStates = new ArrayList<>(); + expectedStates.addAll(getExpectedCommittables(input1, testHarness)); + expectedStates.addAll(getExpectedCommittables(input2, testHarness)); + assertThat( + expectedStates, + equalTo( + streamingCommitterOperator.getCurrentCommittables())); + testHarness.close(); + } + + @Test + public void commitMultipleStagesTogether() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + testHarness.initializeEmptyState(); + testHarness.open(); + + final List<String> expectedOutput = new ArrayList<>(); + expectedOutput.addAll(preCommit(testHarness, 1L, Arrays.asList("cautious", "nature"))); + expectedOutput.addAll(preCommit(testHarness, 2L, Arrays.asList("cautious", "nature"))); + expectedOutput.addAll(preCommit(testHarness, 3L, Arrays.asList("lawyer", "grammar"))); + + completeCheckpoint(testHarness, 3, expectedOutput); + + testHarness.close(); + } + + /** + * Process the input and return the snapshot. + */ + protected OperatorSubtaskState buildSubtaskState(List<String> input) throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + testHarness.initializeEmptyState(); + testHarness.open(); + + final List<String> committables = getExpectedCommittables(input, testHarness); + process(testHarness, input); + final OperatorSubtaskState operatorSubtaskState = snapshot(testHarness, 1, 1, committables); + testHarness.close(); + + return operatorSubtaskState; + } + + private List<String> getExpectedCommittables( + List<String> committables, + OneInputStreamOperatorTestHarness<String, String> testHarness) { + final AbstractTestCommitter<String> committer = getCommitter(testHarness); + if (committer instanceof GlobalCommitter) { + final GlobalCommitter<String, String> globalCommitter = (GlobalCommitter<String, String>) committer; + return Collections.singletonList(globalCommitter.combine(committables)); + } + return committables; + } + + // ------------------------ The sub class should override or implement following method ---------------------------- + + abstract AbstractStreamingCommitterOperatorFactory<String, String> createStreamingCommitterOperator( + TestSink<?, String, String, String> sink); + + GlobalCommitter<String, String> createGlobalCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + Committer<String> createCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + GlobalCommitter<String, String> createAlwaysRetryGlobalCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + Committer<String> createAlwaysRetryCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + // -------------------------------- OneInputStreamOperatorTestHarness Factory -------------------------------------- + + protected OneInputStreamOperatorTestHarness<String, String> createTestHarness() throws Exception { + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.of(createGlobalCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + } + + protected OneInputStreamOperatorTestHarness<String, String> createTestHarness( + OperatorSubtaskState operatorSubtaskState) throws Exception { + + final OneInputStreamOperatorTestHarness<String, String> operatorTestHarness = + new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.of(createGlobalCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + operatorTestHarness.initializeState(operatorSubtaskState); + return operatorTestHarness; + } + + OneInputStreamOperatorTestHarness<String, String> createAlwaysRetryTestHarness() throws Exception { + + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createAlwaysRetryCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.of(createAlwaysRetryGlobalCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + } + + OneInputStreamOperatorTestHarness<String, String> createWithoutSerializer() throws Exception { + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createCommitter()), + () -> Optional.empty(), + () -> Optional.of(createGlobalCommitter()), + () -> Optional.empty())), StringSerializer.INSTANCE); + } + + OneInputStreamOperatorTestHarness<String, String> createWithoutCommitter() throws Exception { + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.empty(), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.empty(), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + } + + /** + * Base class for out testing {@link Committer} and {@link org.apache.flink.api.connector.sink.GlobalCommitter}. + */ + abstract static class AbstractTestCommitter<CommT> implements Committer<CommT> { + + protected List<CommT> committedData; + + private boolean isClosed; + + public AbstractTestCommitter() { + this.committedData = new ArrayList<>(); + this.isClosed = false; + } + + public List<CommT> getCommittedData() { + return committedData; + } + + @Override + public void close() throws Exception { + isClosed = true; + } + + public boolean isClosed() { + return isClosed; + } + } + + // ------------------------------------------- Utils for testing --------------------------------------------------- + + /** + * Process the input and return the pre-commit results. + */ + private List<String> preCommit( + OneInputStreamOperatorTestHarness<String, String> testHarness, + long checkpointId, + List<String> input) throws Exception { + + process(testHarness, input); + final List<String> expectedCommittables = getExpectedCommittables(input, testHarness); + snapshot(testHarness, checkpointId, 1L, expectedCommittables); + return expectedCommittables; + } + + private <InputT, CommT> void process( + OneInputStreamOperatorTestHarness<InputT, CommT> testHarness, + List<InputT> inputs) { + + final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator = (AbstractStreamingCommitterOperator<String, String>) testHarness + .getOneInputOperator(); + final List<StreamRecord<InputT>> inputStreamRecords = inputs + .stream() + .map(StreamRecord::new) + .collect(Collectors.toList()); + + // verify currently received committables + inputStreamRecords.forEach(FunctionUtils.uncheckedConsumer(testHarness::processElement)); + + Assert.assertArrayEquals( + inputs.toArray(), + streamingCommitterOperator.getCurrentInputs().toArray()); + } + + <InputT, CommT> OperatorSubtaskState snapshot( + OneInputStreamOperatorTestHarness<InputT, CommT> testHarness, + long checkpointId, + long checkpointTimestamp, + List<CommT> expectedCommittables) throws Exception { + + final AbstractStreamingCommitterOperator<InputT, CommT> streamingCommitterOperator = + (AbstractStreamingCommitterOperator<InputT, CommT>) testHarness.getOneInputOperator(); + + final List<CommT> expectedState = + streamingCommitterOperator.getCommittablesPerCheckpoint().values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + expectedState.addAll(expectedCommittables); + + final NavigableMap<Long, List<CommT>> expectedCommittablesPerCheckpoint = new TreeMap<>( + streamingCommitterOperator.getCommittablesPerCheckpoint()); + final List<StreamingCommitterState<CommT>> expectedStreamingCommitterState = Collections.singletonList( + new StreamingCommitterState<>(expectedState)); + + expectedCommittablesPerCheckpoint.put( + checkpointId, + streamingCommitterOperator.getCurrentCommittables()); + + final OperatorSubtaskState operatorSubtaskState = testHarness.snapshot( + checkpointId, + checkpointTimestamp); + + // verify current committables are cleaned up after checkpointing + assertThat( + streamingCommitterOperator.getCurrentInputs(), + is( + empty())); + + // verify the state + final List<StreamingCommitterState<CommT>> operatorState = + CollectionUtil.iterableToList(streamingCommitterOperator.getState().get()); + + assertThat( Review comment: might change to `assertEquals` ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterTestBase.java ########## @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionUtils; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * Base class for Tests for subclasses of {@link AbstractWriterOperator}. + */ +public abstract class StreamingCommitterTestBase extends TestLogger { + + private static final TestSink.DefaultWriter<String> DEFAULT_WRITER = new TestSink.DefaultWriter<>(); + + @Test(expected = IllegalArgumentException.class) + public void throwExceptionWithoutSerializer() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutSerializer(); + testHarness.initializeEmptyState(); + testHarness.open(); + } + + @Test(expected = IllegalArgumentException.class) + public void throwExceptionWithoutCommitter() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutCommitter(); + testHarness.initializeEmptyState(); + testHarness.open(); + } + + @Test(expected = UnsupportedOperationException.class) + public void doNotSupportRetry() throws Exception { + final List<String> input = Arrays.asList("lazy", "leaf"); + final OneInputStreamOperatorTestHarness<String, String> testHarness = createAlwaysRetryTestHarness(); + + testHarness.initializeEmptyState(); + testHarness.open(); + + final List<String> expectedCommittables = getExpectedCommittables(input, testHarness); + + process(testHarness, input); + snapshot(testHarness, 1, 1, expectedCommittables); + completeCheckpoint(testHarness, 1, expectedCommittables); + + testHarness.close(); + } + + @Test + public void closeCommitter() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + testHarness.initializeEmptyState(); + testHarness.open(); + testHarness.close(); + assertThat( + getCommitter(testHarness).isClosed(), + is(true)); + } + + @Test + public void restoredFromMergedState() throws Exception { + + final List<String> input1 = Arrays.asList("today", "whom"); + final OperatorSubtaskState operatorSubtaskState1 = buildSubtaskState(input1); + + final List<String> input2 = Arrays.asList("future", "evil", "how"); + final OperatorSubtaskState operatorSubtaskState2 = buildSubtaskState(input2); + + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + + final OperatorSubtaskState mergedOperatorSubtaskState = + OneInputStreamOperatorTestHarness.repackageState( + operatorSubtaskState1, + operatorSubtaskState2); + + testHarness.initializeState(mergedOperatorSubtaskState); + + final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator = + (AbstractStreamingCommitterOperator<String, String>) testHarness.getOneInputOperator(); + + final List<String> expectedStates = new ArrayList<>(); + expectedStates.addAll(getExpectedCommittables(input1, testHarness)); + expectedStates.addAll(getExpectedCommittables(input2, testHarness)); + assertThat( + expectedStates, + equalTo( + streamingCommitterOperator.getCurrentCommittables())); + testHarness.close(); + } + + @Test + public void commitMultipleStagesTogether() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + testHarness.initializeEmptyState(); + testHarness.open(); + + final List<String> expectedOutput = new ArrayList<>(); + expectedOutput.addAll(preCommit(testHarness, 1L, Arrays.asList("cautious", "nature"))); + expectedOutput.addAll(preCommit(testHarness, 2L, Arrays.asList("cautious", "nature"))); + expectedOutput.addAll(preCommit(testHarness, 3L, Arrays.asList("lawyer", "grammar"))); + + completeCheckpoint(testHarness, 3, expectedOutput); + + testHarness.close(); + } + + /** + * Process the input and return the snapshot. + */ + protected OperatorSubtaskState buildSubtaskState(List<String> input) throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + testHarness.initializeEmptyState(); + testHarness.open(); + + final List<String> committables = getExpectedCommittables(input, testHarness); + process(testHarness, input); + final OperatorSubtaskState operatorSubtaskState = snapshot(testHarness, 1, 1, committables); + testHarness.close(); + + return operatorSubtaskState; + } + + private List<String> getExpectedCommittables( + List<String> committables, + OneInputStreamOperatorTestHarness<String, String> testHarness) { + final AbstractTestCommitter<String> committer = getCommitter(testHarness); + if (committer instanceof GlobalCommitter) { + final GlobalCommitter<String, String> globalCommitter = (GlobalCommitter<String, String>) committer; + return Collections.singletonList(globalCommitter.combine(committables)); + } + return committables; + } + + // ------------------------ The sub class should override or implement following method ---------------------------- + + abstract AbstractStreamingCommitterOperatorFactory<String, String> createStreamingCommitterOperator( + TestSink<?, String, String, String> sink); + + GlobalCommitter<String, String> createGlobalCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + Committer<String> createCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + GlobalCommitter<String, String> createAlwaysRetryGlobalCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + Committer<String> createAlwaysRetryCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + // -------------------------------- OneInputStreamOperatorTestHarness Factory -------------------------------------- + + protected OneInputStreamOperatorTestHarness<String, String> createTestHarness() throws Exception { + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.of(createGlobalCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + } + + protected OneInputStreamOperatorTestHarness<String, String> createTestHarness( + OperatorSubtaskState operatorSubtaskState) throws Exception { + + final OneInputStreamOperatorTestHarness<String, String> operatorTestHarness = + new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.of(createGlobalCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + operatorTestHarness.initializeState(operatorSubtaskState); + return operatorTestHarness; + } + + OneInputStreamOperatorTestHarness<String, String> createAlwaysRetryTestHarness() throws Exception { + + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createAlwaysRetryCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.of(createAlwaysRetryGlobalCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + } + + OneInputStreamOperatorTestHarness<String, String> createWithoutSerializer() throws Exception { + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createCommitter()), + () -> Optional.empty(), + () -> Optional.of(createGlobalCommitter()), + () -> Optional.empty())), StringSerializer.INSTANCE); + } + + OneInputStreamOperatorTestHarness<String, String> createWithoutCommitter() throws Exception { + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.empty(), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.empty(), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + } + + /** + * Base class for out testing {@link Committer} and {@link org.apache.flink.api.connector.sink.GlobalCommitter}. + */ + abstract static class AbstractTestCommitter<CommT> implements Committer<CommT> { + + protected List<CommT> committedData; + + private boolean isClosed; + + public AbstractTestCommitter() { + this.committedData = new ArrayList<>(); + this.isClosed = false; + } + + public List<CommT> getCommittedData() { + return committedData; + } + + @Override + public void close() throws Exception { + isClosed = true; + } + + public boolean isClosed() { + return isClosed; + } + } + + // ------------------------------------------- Utils for testing --------------------------------------------------- + + /** + * Process the input and return the pre-commit results. + */ + private List<String> preCommit( + OneInputStreamOperatorTestHarness<String, String> testHarness, + long checkpointId, + List<String> input) throws Exception { + + process(testHarness, input); + final List<String> expectedCommittables = getExpectedCommittables(input, testHarness); + snapshot(testHarness, checkpointId, 1L, expectedCommittables); + return expectedCommittables; + } + + private <InputT, CommT> void process( + OneInputStreamOperatorTestHarness<InputT, CommT> testHarness, + List<InputT> inputs) { + + final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator = (AbstractStreamingCommitterOperator<String, String>) testHarness + .getOneInputOperator(); + final List<StreamRecord<InputT>> inputStreamRecords = inputs + .stream() + .map(StreamRecord::new) + .collect(Collectors.toList()); + + // verify currently received committables + inputStreamRecords.forEach(FunctionUtils.uncheckedConsumer(testHarness::processElement)); + + Assert.assertArrayEquals( + inputs.toArray(), + streamingCommitterOperator.getCurrentInputs().toArray()); + } + + <InputT, CommT> OperatorSubtaskState snapshot( + OneInputStreamOperatorTestHarness<InputT, CommT> testHarness, + long checkpointId, + long checkpointTimestamp, + List<CommT> expectedCommittables) throws Exception { + + final AbstractStreamingCommitterOperator<InputT, CommT> streamingCommitterOperator = + (AbstractStreamingCommitterOperator<InputT, CommT>) testHarness.getOneInputOperator(); + + final List<CommT> expectedState = + streamingCommitterOperator.getCommittablesPerCheckpoint().values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + expectedState.addAll(expectedCommittables); + + final NavigableMap<Long, List<CommT>> expectedCommittablesPerCheckpoint = new TreeMap<>( + streamingCommitterOperator.getCommittablesPerCheckpoint()); + final List<StreamingCommitterState<CommT>> expectedStreamingCommitterState = Collections.singletonList( + new StreamingCommitterState<>(expectedState)); + + expectedCommittablesPerCheckpoint.put( + checkpointId, + streamingCommitterOperator.getCurrentCommittables()); + + final OperatorSubtaskState operatorSubtaskState = testHarness.snapshot( + checkpointId, + checkpointTimestamp); + + // verify current committables are cleaned up after checkpointing + assertThat( + streamingCommitterOperator.getCurrentInputs(), + is( + empty())); + + // verify the state + final List<StreamingCommitterState<CommT>> operatorState = + CollectionUtil.iterableToList(streamingCommitterOperator.getState().get()); + + assertThat( + operatorState, + equalTo( + expectedStreamingCommitterState)); + + // verify committables per checkpoint + assertThat( Review comment: might change to `assertEquals` ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/StreamingCommitterTestBase.java ########## @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionUtils; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +/** + * Base class for Tests for subclasses of {@link AbstractWriterOperator}. + */ +public abstract class StreamingCommitterTestBase extends TestLogger { + + private static final TestSink.DefaultWriter<String> DEFAULT_WRITER = new TestSink.DefaultWriter<>(); + + @Test(expected = IllegalArgumentException.class) + public void throwExceptionWithoutSerializer() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutSerializer(); + testHarness.initializeEmptyState(); + testHarness.open(); + } + + @Test(expected = IllegalArgumentException.class) + public void throwExceptionWithoutCommitter() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createWithoutCommitter(); + testHarness.initializeEmptyState(); + testHarness.open(); + } + + @Test(expected = UnsupportedOperationException.class) + public void doNotSupportRetry() throws Exception { + final List<String> input = Arrays.asList("lazy", "leaf"); + final OneInputStreamOperatorTestHarness<String, String> testHarness = createAlwaysRetryTestHarness(); + + testHarness.initializeEmptyState(); + testHarness.open(); + + final List<String> expectedCommittables = getExpectedCommittables(input, testHarness); + + process(testHarness, input); + snapshot(testHarness, 1, 1, expectedCommittables); + completeCheckpoint(testHarness, 1, expectedCommittables); + + testHarness.close(); + } + + @Test + public void closeCommitter() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + testHarness.initializeEmptyState(); + testHarness.open(); + testHarness.close(); + assertThat( + getCommitter(testHarness).isClosed(), + is(true)); + } + + @Test + public void restoredFromMergedState() throws Exception { + + final List<String> input1 = Arrays.asList("today", "whom"); + final OperatorSubtaskState operatorSubtaskState1 = buildSubtaskState(input1); + + final List<String> input2 = Arrays.asList("future", "evil", "how"); + final OperatorSubtaskState operatorSubtaskState2 = buildSubtaskState(input2); + + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + + final OperatorSubtaskState mergedOperatorSubtaskState = + OneInputStreamOperatorTestHarness.repackageState( + operatorSubtaskState1, + operatorSubtaskState2); + + testHarness.initializeState(mergedOperatorSubtaskState); + + final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator = + (AbstractStreamingCommitterOperator<String, String>) testHarness.getOneInputOperator(); + + final List<String> expectedStates = new ArrayList<>(); + expectedStates.addAll(getExpectedCommittables(input1, testHarness)); + expectedStates.addAll(getExpectedCommittables(input2, testHarness)); + assertThat( + expectedStates, + equalTo( + streamingCommitterOperator.getCurrentCommittables())); + testHarness.close(); + } + + @Test + public void commitMultipleStagesTogether() throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + testHarness.initializeEmptyState(); + testHarness.open(); + + final List<String> expectedOutput = new ArrayList<>(); + expectedOutput.addAll(preCommit(testHarness, 1L, Arrays.asList("cautious", "nature"))); + expectedOutput.addAll(preCommit(testHarness, 2L, Arrays.asList("cautious", "nature"))); + expectedOutput.addAll(preCommit(testHarness, 3L, Arrays.asList("lawyer", "grammar"))); + + completeCheckpoint(testHarness, 3, expectedOutput); + + testHarness.close(); + } + + /** + * Process the input and return the snapshot. + */ + protected OperatorSubtaskState buildSubtaskState(List<String> input) throws Exception { + final OneInputStreamOperatorTestHarness<String, String> testHarness = createTestHarness(); + testHarness.initializeEmptyState(); + testHarness.open(); + + final List<String> committables = getExpectedCommittables(input, testHarness); + process(testHarness, input); + final OperatorSubtaskState operatorSubtaskState = snapshot(testHarness, 1, 1, committables); + testHarness.close(); + + return operatorSubtaskState; + } + + private List<String> getExpectedCommittables( + List<String> committables, + OneInputStreamOperatorTestHarness<String, String> testHarness) { + final AbstractTestCommitter<String> committer = getCommitter(testHarness); + if (committer instanceof GlobalCommitter) { + final GlobalCommitter<String, String> globalCommitter = (GlobalCommitter<String, String>) committer; + return Collections.singletonList(globalCommitter.combine(committables)); + } + return committables; + } + + // ------------------------ The sub class should override or implement following method ---------------------------- + + abstract AbstractStreamingCommitterOperatorFactory<String, String> createStreamingCommitterOperator( + TestSink<?, String, String, String> sink); + + GlobalCommitter<String, String> createGlobalCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + Committer<String> createCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + GlobalCommitter<String, String> createAlwaysRetryGlobalCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + Committer<String> createAlwaysRetryCommitter() { + throw new RuntimeException("Sub class should override this method"); + } + + // -------------------------------- OneInputStreamOperatorTestHarness Factory -------------------------------------- + + protected OneInputStreamOperatorTestHarness<String, String> createTestHarness() throws Exception { + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.of(createGlobalCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + } + + protected OneInputStreamOperatorTestHarness<String, String> createTestHarness( + OperatorSubtaskState operatorSubtaskState) throws Exception { + + final OneInputStreamOperatorTestHarness<String, String> operatorTestHarness = + new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.of(createGlobalCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + operatorTestHarness.initializeState(operatorSubtaskState); + return operatorTestHarness; + } + + OneInputStreamOperatorTestHarness<String, String> createAlwaysRetryTestHarness() throws Exception { + + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createAlwaysRetryCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.of(createAlwaysRetryGlobalCommitter()), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + } + + OneInputStreamOperatorTestHarness<String, String> createWithoutSerializer() throws Exception { + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.of(createCommitter()), + () -> Optional.empty(), + () -> Optional.of(createGlobalCommitter()), + () -> Optional.empty())), StringSerializer.INSTANCE); + } + + OneInputStreamOperatorTestHarness<String, String> createWithoutCommitter() throws Exception { + return new OneInputStreamOperatorTestHarness<>( + createStreamingCommitterOperator( + TestSink.create( + () -> DEFAULT_WRITER, + () -> Optional.empty(), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE), + () -> Optional.empty(), + () -> Optional.of(SimpleVersionedStringSerializer.INSTANCE))), + StringSerializer.INSTANCE); + } + + /** + * Base class for out testing {@link Committer} and {@link org.apache.flink.api.connector.sink.GlobalCommitter}. + */ + abstract static class AbstractTestCommitter<CommT> implements Committer<CommT> { + + protected List<CommT> committedData; + + private boolean isClosed; + + public AbstractTestCommitter() { + this.committedData = new ArrayList<>(); + this.isClosed = false; + } + + public List<CommT> getCommittedData() { + return committedData; + } + + @Override + public void close() throws Exception { + isClosed = true; + } + + public boolean isClosed() { + return isClosed; + } + } + + // ------------------------------------------- Utils for testing --------------------------------------------------- + + /** + * Process the input and return the pre-commit results. + */ + private List<String> preCommit( + OneInputStreamOperatorTestHarness<String, String> testHarness, + long checkpointId, + List<String> input) throws Exception { + + process(testHarness, input); + final List<String> expectedCommittables = getExpectedCommittables(input, testHarness); + snapshot(testHarness, checkpointId, 1L, expectedCommittables); + return expectedCommittables; + } + + private <InputT, CommT> void process( + OneInputStreamOperatorTestHarness<InputT, CommT> testHarness, + List<InputT> inputs) { + + final AbstractStreamingCommitterOperator<String, String> streamingCommitterOperator = (AbstractStreamingCommitterOperator<String, String>) testHarness + .getOneInputOperator(); + final List<StreamRecord<InputT>> inputStreamRecords = inputs + .stream() + .map(StreamRecord::new) + .collect(Collectors.toList()); + + // verify currently received committables + inputStreamRecords.forEach(FunctionUtils.uncheckedConsumer(testHarness::processElement)); + + Assert.assertArrayEquals( + inputs.toArray(), + streamingCommitterOperator.getCurrentInputs().toArray()); + } + + <InputT, CommT> OperatorSubtaskState snapshot( + OneInputStreamOperatorTestHarness<InputT, CommT> testHarness, + long checkpointId, + long checkpointTimestamp, + List<CommT> expectedCommittables) throws Exception { + + final AbstractStreamingCommitterOperator<InputT, CommT> streamingCommitterOperator = + (AbstractStreamingCommitterOperator<InputT, CommT>) testHarness.getOneInputOperator(); + + final List<CommT> expectedState = + streamingCommitterOperator.getCommittablesPerCheckpoint().values().stream() + .flatMap(Collection::stream) + .collect(Collectors.toList()); + expectedState.addAll(expectedCommittables); + + final NavigableMap<Long, List<CommT>> expectedCommittablesPerCheckpoint = new TreeMap<>( + streamingCommitterOperator.getCommittablesPerCheckpoint()); + final List<StreamingCommitterState<CommT>> expectedStreamingCommitterState = Collections.singletonList( + new StreamingCommitterState<>(expectedState)); + + expectedCommittablesPerCheckpoint.put( + checkpointId, + streamingCommitterOperator.getCurrentCommittables()); + + final OperatorSubtaskState operatorSubtaskState = testHarness.snapshot( + checkpointId, + checkpointTimestamp); + + // verify current committables are cleaned up after checkpointing + assertThat( + streamingCommitterOperator.getCurrentInputs(), + is( Review comment: No need to change line here ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
