Repository: bahir-flink
Updated Branches:
  refs/heads/master ca795cc74 -> c760e3cfb


http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java
----------------------------------------------------------------------
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java
new file mode 100644
index 0000000..41e59b7
--- /dev/null
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/streaming/connectors/kudu/connector/KuduDatabase.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kudu.connector;
+
+import org.apache.kudu.Type;
+import org.junit.jupiter.api.Assertions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class KuduDatabase {
+
+    protected static final String hostsCluster = "172.25.0.6";
+
+    protected static final Object[][] booksTableData = {
+            {1001, "Java for dummies", "Tan Ah Teck", 11.11, 11},
+            {1002, "More Java for dummies", "Tan Ah Teck", 22.22, 22},
+            {1003, "More Java for more dummies", "Mohammad Ali", 33.33, 33},
+            {1004, "A Cup of Java", "Kumar", 44.44, 44},
+            {1005, "A Teaspoon of Java", "Kevin Jones", 55.55, 55}};
+
+
+    protected static KuduTableInfo booksTableInfo(String tableName, boolean 
createIfNotExist) {
+        return KuduTableInfo.Builder
+                .create(tableName)
+                .createIfNotExist(createIfNotExist)
+                .replicas(1)
+                .addColumn(KuduColumnInfo.Builder.create("id", 
Type.INT32).key(true).hashKey(true).build())
+                .addColumn(KuduColumnInfo.Builder.create("title", 
Type.STRING).build())
+                .addColumn(KuduColumnInfo.Builder.create("author", 
Type.STRING).build())
+                .addColumn(KuduColumnInfo.Builder.create("price", 
Type.DOUBLE).build())
+                .addColumn(KuduColumnInfo.Builder.create("quantity", 
Type.INT32).build())
+                .build();
+    }
+
+    protected static List<KuduRow> booksDataRow() {
+        return Arrays.stream(booksTableData)
+                .map(row -> {
+                        KuduRow values = new KuduRow(5);
+                        values.setField(0, "id", row[0]);
+                        values.setField(1, "title", row[1]);
+                        values.setField(2, "author", row[2]);
+                        values.setField(3, "price", row[3]);
+                        values.setField(4, "quantity", row[4]);
+                        return values;
+                })
+                .collect(Collectors.toList());
+    }
+
+    public void setUpDatabase(KuduTableInfo tableInfo) {
+        try {
+            KuduConnector tableContext = new KuduConnector(hostsCluster, 
tableInfo);
+            booksDataRow().forEach(row -> {
+                try {
+                    tableContext.writeRow(row, 
KuduConnector.Consistency.STRONG, KuduConnector.WriteMode.UPSERT);
+                }catch (Exception e) {
+                    e.printStackTrace();
+                }
+            });
+        } catch (Exception e) {
+            Assertions.fail();
+        }
+    }
+
+    protected static void cleanDatabase(KuduTableInfo tableInfo) {
+        try {
+            KuduConnector tableContext = new KuduConnector(hostsCluster, 
tableInfo);
+            tableContext.deleteTable();
+            tableContext.close();
+        } catch (Exception e) {
+            Assertions.fail();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bahir-flink/blob/c760e3cf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d2d435c..f052045 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,6 +78,7 @@
     <module>flink-connector-akka</module>
     <module>flink-connector-influxdb</module>
     <module>flink-library-siddhi</module>
+    <module>flink-connector-kudu</module>
   </modules>
 
   <properties>
@@ -85,7 +86,7 @@
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
 
     <!-- General project dependencies version -->
-    <java.version>1.7</java.version>
+    <java.version>1.8</java.version>
     <scala.version>2.11.8</scala.version>
     <scala.binary.version>2.11</scala.binary.version>
 

Reply via email to