AHeise commented on a change in pull request #15109:
URL: https://github.com/apache/flink/pull/15109#discussion_r590062671



##########
File path: 
flink-connectors/flink-connector-hbase/src/test/java/org/apache/flink/connector/hbase/source/HBaseSourceITCase.java
##########
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.hbase.source.reader.HBaseEvent;
+import org.apache.flink.connector.hbase.source.reader.HBaseSourceDeserializer;
+import org.apache.flink.connector.hbase.testutil.FailureSink;
+import org.apache.flink.connector.hbase.testutil.HBaseTestClusterUtil;
+import org.apache.flink.connector.hbase.testutil.Util;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterJobClient;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+
+import org.apache.hadoop.hbase.client.Put;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.connector.hbase.testutil.FileSignal.awaitSignalThrowOnFailure;
+import static 
org.apache.flink.connector.hbase.testutil.FileSignal.awaitSuccess;
+import static 
org.apache.flink.connector.hbase.testutil.FileSignal.cleanupFolder;
+import static 
org.apache.flink.connector.hbase.testutil.FileSignal.cleanupSignal;
+import static org.apache.flink.connector.hbase.testutil.FileSignal.makeFolder;
+import static org.apache.flink.connector.hbase.testutil.FileSignal.signal;
+import static 
org.apache.flink.connector.hbase.testutil.FileSignal.signalFailure;
+import static 
org.apache.flink.connector.hbase.testutil.FileSignal.signalSuccess;
+import static org.junit.Assert.assertArrayEquals;
+
+/** Tests the most basic use cases of the source with a mocked HBase system. */
+public class HBaseSourceITCase extends TestsWithTestHBaseCluster {
+
+    private DataStream<String> streamFromHBaseSource(
+            StreamExecutionEnvironment environment, String tableName)
+            throws ParserConfigurationException, SAXException, IOException {
+        return streamFromHBaseSource(environment, tableName, 1);
+    }
+
+    private DataStream<String> streamFromHBaseSource(
+            StreamExecutionEnvironment environment, String tableName, int 
parallelism)
+            throws ParserConfigurationException, SAXException, IOException {
+        HBaseStringDeserializationScheme deserializationScheme =
+                new HBaseStringDeserializationScheme();
+        HBaseSource<String> source =
+                new HBaseSource<>(deserializationScheme, tableName, 
cluster.getConfig());
+        environment.setParallelism(parallelism);
+        DataStream<String> stream =
+                environment.fromSource(
+                        source,
+                        WatermarkStrategy.noWatermarks(),
+                        "hbaseSourceITCase",
+                        deserializationScheme.getProducedType());
+        return stream;
+    }
+
+    private static <T> void expectFirstValuesToBe(
+            DataStream<T> stream, T[] expectedValues, String message) {
+
+        List<T> collectedValues = new ArrayList<>();
+        stream.flatMap(
+                new RichFlatMapFunction<T, Object>() {
+
+                    @Override
+                    public void flatMap(T value, Collector<Object> out) {
+                        System.out.println("Test collected: " + value);
+                        collectedValues.add(value);
+                        if (collectedValues.size() == expectedValues.length) {
+                            assertArrayEquals(message, expectedValues, 
collectedValues.toArray());
+                            throw new SuccessException();
+                        }
+                    }
+                });
+    }
+
+    private static void doAndWaitForSuccess(
+            StreamExecutionEnvironment env, Runnable action, int timeout) {
+        try {
+            JobClient jobClient = env.executeAsync();
+            MiniCluster miniCluster = Util.miniCluster((MiniClusterJobClient) 
jobClient);
+            Util.waitForClusterStart(miniCluster, true);
+
+            action.run();
+            jobClient.getJobExecutionResult().get(timeout, TimeUnit.SECONDS);
+            jobClient.cancel();
+            throw new RuntimeException("Waiting for the correct data timed 
out");
+        } catch (Exception exception) {
+            if (!causedBySuccess(exception)) {
+                throw new RuntimeException("Test failed", exception);
+            } else {
+                // Normal termination
+            }
+        }
+    }
+
+    @Before
+    public void makeSignalFolder() {
+        makeFolder();
+    }
+
+    @After
+    public void cleanupSignalFolder() throws IOException {
+        cleanupFolder();
+    }
+
+    @Test
+    public void testBasicPut() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<String> stream = streamFromHBaseSource(env, baseTableName);
+        cluster.makeTable(baseTableName, DEFAULT_CF_COUNT);
+        String[] expectedValues = uniqueValues(2 * DEFAULT_CF_COUNT);
+
+        expectFirstValuesToBe(
+                stream,
+                expectedValues,
+                "HBase source did not produce the right values after a basic 
put operation");
+        doAndWaitForSuccess(
+                env, () -> cluster.put(baseTableName, DEFAULT_CF_COUNT, 
expectedValues), 120);
+    }
+
+    @Test
+    public void testOnlyReplicateSpecifiedTable() throws Exception {
+        String secondTable = baseTableName + "-table2";
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        DataStream<String> stream = streamFromHBaseSource(env, baseTableName);
+        cluster.makeTable(baseTableName, DEFAULT_CF_COUNT);
+        cluster.makeTable(secondTable, DEFAULT_CF_COUNT);
+        String[] expectedValues = uniqueValues(DEFAULT_CF_COUNT);
+
+        expectFirstValuesToBe(
+                stream,
+                expectedValues,
+                "HBase source did not produce the values of the correct 
table");
+        doAndWaitForSuccess(
+                env,
+                () -> {
+                    cluster.put(secondTable, DEFAULT_CF_COUNT, 
uniqueValues(DEFAULT_CF_COUNT));
+                    try {
+                        Thread.sleep(2000);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                    cluster.put(baseTableName, DEFAULT_CF_COUNT, 
expectedValues);
+                },
+                180);
+    }
+
+    @Test
+    public void testRecordsAreProducedExactlyOnceWithCheckpoints() throws 
Exception {
+        final String collectedValueSignal = "collectedValue";
+        String[] expectedValues = uniqueValues(20);
+        cluster.makeTable(baseTableName);
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(2000);
+        
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+        DataStream<String> stream = streamFromHBaseSource(env, baseTableName);
+        stream.addSink(
+                new FailureSink<String>(true, 3500, 
TypeInformation.of(String.class)) {
+
+                    private void checkForSuccess() {
+                        List<String> checkpointed = getCheckpointedValues();
+                        System.out.println(unCheckpointedValues + " " + 
checkpointed);
+                        if (checkpointed.size() == expectedValues.length) {
+                            try {
+                                assertArrayEquals(
+                                        "Wrong values were produced.",
+                                        expectedValues,
+                                        checkpointed.toArray());
+                                signalSuccess();
+                            } catch (Exception e) {

Review comment:
       ```
   catch (Exception e) {
       signalFailure();
       throw e;
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to