[FLINK-6281] [jdbc] Add JDBCAppendTableSink.

This closes #3712.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/43e5a81d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/43e5a81d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/43e5a81d

Branch: refs/heads/master
Commit: 43e5a81d4e95f4f7b239ab90f12dfb66e7ae8a48
Parents: 1de8acc
Author: Haohui Mai <[email protected]>
Authored: Tue Apr 11 23:56:56 2017 -0700
Committer: Fabian Hueske <[email protected]>
Committed: Fri Aug 11 17:29:20 2017 +0200

----------------------------------------------------------------------
 docs/dev/table/sourceSinks.md                   |  33 ++++-
 flink-connectors/flink-jdbc/pom.xml             |  10 +-
 .../api/java/io/jdbc/JDBCAppendTableSink.java   | 120 ++++++++++++++++
 .../io/jdbc/JDBCAppendTableSinkBuilder.java     | 140 +++++++++++++++++++
 .../api/java/io/jdbc/JDBCOutputFormat.java      |  41 +++---
 .../api/java/io/jdbc/JDBCSinkFunction.java      |  63 +++++++++
 .../flink/api/java/io/jdbc/JDBCTypeUtil.java    | 103 ++++++++++++++
 .../java/io/jdbc/JDBCAppendTableSinkTest.java   |  90 ++++++++++++
 .../api/java/io/jdbc/JDBCOutputFormatTest.java  |  71 ++++++++--
 .../flink/api/java/io/jdbc/JDBCTestBase.java    |   4 +
 .../api/java/io/jdbc/JDBCTypeUtilTest.java      |  52 +++++++
 11 files changed, 684 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 7af74ca..53b93e1 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -202,7 +202,38 @@ val csvTableSource = CsvTableSource
 Provided TableSinks
 -------------------
 
-**TODO**
+### JDBCAppendSink
+
+<code>JDBCAppendSink</code> allows you to bridge the data stream to the JDBC 
driver. The sink only supports append-only data. It does not support 
retractions and upserts from Flink's perspectives. However, you can customize 
the query using <code>REPLACE</code> or <code>INSERT OVERWRITE</code> to 
implement upsert inside the database.
+
+To use the JDBC sink, you have to add the JDBC connector dependency 
(<code>flink-jdbc</code>) to your project. Then you can create the sink using 
<code>JDBCAppendSinkBuilder</code>:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
+  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+  .setDBUrl("jdbc:derby:memory:ebookshop")
+  .setQuery("INSERT INTO books (id) VALUES (?)")
+  .setParameterTypes(INT_TYPE_INFO)
+  .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val sink = JDBCAppendTableSink.builder()
+  .setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+  .setDBUrl("jdbc:derby:memory:ebookshop")
+  .setQuery("INSERT INTO books (id) VALUES (?)")
+  .setParameterTypes(INT_TYPE_INFO)
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
+Similar to using <code>JDBCOutputFormat</code>, you have to explicitly specify 
the name of the JDBC driver, the JDBC URL, the query to be executed, and the 
field types of the JDBC table. You can connect the sink with other 
<code>DataStream</code>s once the sink is constructed.
 
 {% top %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-jdbc/pom.xml 
b/flink-connectors/flink-jdbc/pom.xml
index 0704dc8..938ec09 100644
--- a/flink-connectors/flink-jdbc/pom.xml
+++ b/flink-connectors/flink-jdbc/pom.xml
@@ -36,7 +36,6 @@ under the License.
        <packaging>jar</packaging>
 
        <dependencies>
-
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        
<artifactId>flink-table_${scala.binary.version}</artifactId>
@@ -49,19 +48,12 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-java</artifactId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <scope>provided</scope>
                </dependency>
 
                <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-clients_${scala.binary.version}</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
                        <groupId>org.apache.derby</groupId>
                        <artifactId>derby</artifactId>
                        <version>10.10.1.1</version>

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
new file mode 100644
index 0000000..fc2d0a8
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/**
+ * An at-least-once Table sink for JDBC.
+ *
+ * <p>The mechanisms of Flink guarantees delivering messages at-least-once to 
this sink (if
+ * checkpointing is enabled). However, one common use case is to run 
idempotent queries
+ * (e.g., <code>REPLACE</code> or <code>INSERT OVERWRITE</code>) to upsert 
into the database and
+ * achieve exactly-once semantic.</p>
+ */
+public class JDBCAppendTableSink implements AppendStreamTableSink<Row>, 
BatchTableSink<Row> {
+
+       private final JDBCOutputFormat outputFormat;
+
+       private String[] fieldNames;
+       private TypeInformation[] fieldTypes;
+
+       JDBCAppendTableSink(JDBCOutputFormat outputFormat) {
+               this.outputFormat = outputFormat;
+       }
+
+       public static JDBCAppendTableSinkBuilder builder() {
+               return new JDBCAppendTableSinkBuilder();
+       }
+
+       @Override
+       public void emitDataStream(DataStream<Row> dataStream) {
+               dataStream.addSink(new JDBCSinkFunction(outputFormat));
+       }
+
+       @Override
+       public void emitDataSet(DataSet<Row> dataSet) {
+               dataSet.output(outputFormat);
+       }
+
+       @Override
+       public TypeInformation<Row> getOutputType() {
+               return new RowTypeInfo(fieldTypes, fieldNames);
+       }
+
+       @Override
+       public String[] getFieldNames() {
+               return fieldNames;
+       }
+
+       @Override
+       public TypeInformation<?>[] getFieldTypes() {
+               return fieldTypes;
+       }
+
+       @Override
+       public TableSink<Row> configure(String[] fieldNames, 
TypeInformation<?>[] fieldTypes) {
+               int[] types = outputFormat.getTypesArray();
+
+               String sinkSchema =
+                       String.join(", ", 
IntStream.of(types).mapToObj(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
+               String tableSchema =
+                       String.join(", ", 
Stream.of(fieldTypes).map(JDBCTypeUtil::getTypeName).collect(Collectors.toList()));
+               String msg = String.format("Schema of output table is 
incompatible with JDBCAppendTableSink schema. " +
+                       "Table schema: [%s], sink schema: [%s]", tableSchema, 
sinkSchema);
+
+               Preconditions.checkArgument(fieldTypes.length == types.length, 
msg);
+               for (int i = 0; i < types.length; ++i) {
+                       Preconditions.checkArgument(
+                               
JDBCTypeUtil.typeInformationToSqlType(fieldTypes[i]) == types[i],
+                               msg);
+               }
+
+               JDBCAppendTableSink copy;
+               try {
+                       copy = new 
JDBCAppendTableSink(InstantiationUtil.clone(outputFormat));
+               } catch (IOException | ClassNotFoundException e) {
+                       throw new RuntimeException(e);
+               }
+
+               copy.fieldNames = fieldNames;
+               copy.fieldTypes = fieldTypes;
+               return copy;
+       }
+
+       @VisibleForTesting
+       JDBCOutputFormat getOutputFormat() {
+               return outputFormat;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
new file mode 100644
index 0000000..da00d74
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkBuilder.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.util.Preconditions;
+
+import static 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.DEFAULT_BATCH_INTERVAL;
+
+/**
+ * A builder to configure and build the JDBCAppendTableSink.
+ */
+public class JDBCAppendTableSinkBuilder {
+       private String username;
+       private String password;
+       private String driverName;
+       private String dbURL;
+       private String query;
+       private int batchSize = DEFAULT_BATCH_INTERVAL;
+       private int[] parameterTypes;
+
+       /**
+        * Specify the username of the JDBC connection.
+        * @param username the username of the JDBC connection.
+        */
+       public JDBCAppendTableSinkBuilder setUsername(String username) {
+               this.username = username;
+               return this;
+       }
+
+       /**
+        * Specify the password of the JDBC connection.
+        * @param password the password of the JDBC connection.
+        */
+       public JDBCAppendTableSinkBuilder setPassword(String password) {
+               this.password = password;
+               return this;
+       }
+
+       /**
+        * Specify the name of the JDBC driver that will be used.
+        * @param drivername the name of the JDBC driver.
+        */
+       public JDBCAppendTableSinkBuilder setDrivername(String drivername) {
+               this.driverName = drivername;
+               return this;
+       }
+
+       /**
+        * Specify the URL of the JDBC database.
+        * @param dbURL the URL of the database, whose format is specified by 
the
+        *              corresponding JDBC driver.
+        */
+       public JDBCAppendTableSinkBuilder setDBUrl(String dbURL) {
+               this.dbURL = dbURL;
+               return this;
+       }
+
+       /**
+        * Specify the query that the sink will execute. Usually user can 
specify
+        * INSERT, REPLACE or UPDATE to push the data to the database.
+        * @param query The query to be executed by the sink.
+        * @see 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.JDBCOutputFormatBuilder#setQuery(String)
+        */
+       public JDBCAppendTableSinkBuilder setQuery(String query) {
+               this.query = query;
+               return this;
+       }
+
+       /**
+        * Specify the size of the batch. By default the sink will batch the 
query
+        * to improve the performance
+        * @param batchSize the size of batch
+        */
+       public JDBCAppendTableSinkBuilder setBatchSize(int batchSize) {
+               this.batchSize = batchSize;
+               return this;
+       }
+
+       /**
+        * Specify the type of the rows that the sink will be accepting.
+        * @param types the type of each field
+        */
+       public JDBCAppendTableSinkBuilder 
setParameterTypes(TypeInformation<?>... types) {
+               int[] ty = new int[types.length];
+               for (int i = 0; i < types.length; ++i) {
+                       ty[i] = JDBCTypeUtil.typeInformationToSqlType(types[i]);
+               }
+               this.parameterTypes = ty;
+               return this;
+       }
+
+       /**
+        * Specify the type of the rows that the sink will be accepting.
+        * @param types the type of each field defined by {@see java.sql.Types}.
+        */
+       public JDBCAppendTableSinkBuilder setParameterTypes(int... types) {
+               this.parameterTypes = types;
+               return this;
+       }
+
+       /**
+        * Finalizes the configuration and checks validity.
+        *
+        * @return Configured JDBCOutputFormat
+        */
+       public JDBCAppendTableSink build() {
+               Preconditions.checkNotNull(parameterTypes,
+                       "Types of the query parameters are not specified." +
+                       " Please specify types using the setParameterTypes() 
method.");
+
+               JDBCOutputFormat format = 
JDBCOutputFormat.buildJDBCOutputFormat()
+                       .setUsername(username)
+                       .setPassword(password)
+                       .setDBUrl(dbURL)
+                       .setQuery(query)
+                       .setDrivername(driverName)
+                       .setBatchInterval(batchSize)
+                       .setSqlTypes(parameterTypes)
+                       .finish();
+
+               return new JDBCAppendTableSink(format);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
index 4cbdbf1..2497712 100644
--- 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -40,6 +40,7 @@ import java.sql.SQLException;
  */
 public class JDBCOutputFormat extends RichOutputFormat<Row> {
        private static final long serialVersionUID = 1L;
+       static final int DEFAULT_BATCH_INTERVAL = 5000;
 
        private static final Logger LOG = 
LoggerFactory.getLogger(JDBCOutputFormat.class);
 
@@ -48,7 +49,7 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> {
        private String drivername;
        private String dbURL;
        private String query;
-       private int batchInterval = 5000;
+       private int batchInterval = DEFAULT_BATCH_INTERVAL;
 
        private Connection dbConn;
        private PreparedStatement upload;
@@ -206,15 +207,23 @@ public class JDBCOutputFormat extends 
RichOutputFormat<Row> {
 
                if (batchCount >= batchInterval) {
                        // execute batch
-                       try {
-                               upload.executeBatch();
-                               batchCount = 0;
-                       } catch (SQLException e) {
-                               throw new RuntimeException("Execution of JDBC 
statement failed.", e);
-                       }
+                       flush();
                }
        }
 
+       void flush() {
+               try {
+                       upload.executeBatch();
+                       batchCount = 0;
+               } catch (SQLException e) {
+                       throw new RuntimeException("Execution of JDBC statement 
failed.", e);
+               }
+       }
+
+       int[] getTypesArray() {
+               return typesArray;
+       }
+
        /**
         * Executes prepared statement and closes all resources of this 
instance.
         *
@@ -223,12 +232,7 @@ public class JDBCOutputFormat extends 
RichOutputFormat<Row> {
        @Override
        public void close() throws IOException {
                if (upload != null) {
-                       // execute last batch
-                       try {
-                               upload.executeBatch();
-                       } catch (SQLException e) {
-                               throw new RuntimeException("Execution of JDBC 
statement failed.", e);
-                       }
+                       flush();
                        // close the connection
                        try {
                                upload.close();
@@ -238,7 +242,6 @@ public class JDBCOutputFormat extends RichOutputFormat<Row> 
{
                                upload = null;
                        }
                }
-               batchCount = 0;
 
                if (dbConn != null) {
                        try {
@@ -307,19 +310,19 @@ public class JDBCOutputFormat extends 
RichOutputFormat<Row> {
                 */
                public JDBCOutputFormat finish() {
                        if (format.username == null) {
-                               LOG.info("Username was not supplied 
separately.");
+                               LOG.info("Username was not supplied.");
                        }
                        if (format.password == null) {
-                               LOG.info("Password was not supplied 
separately.");
+                               LOG.info("Password was not supplied.");
                        }
                        if (format.dbURL == null) {
-                               throw new IllegalArgumentException("No dababase 
URL supplied.");
+                               throw new IllegalArgumentException("No database 
URL supplied.");
                        }
                        if (format.query == null) {
-                               throw new IllegalArgumentException("No query 
suplied");
+                               throw new IllegalArgumentException("No query 
supplied.");
                        }
                        if (format.drivername == null) {
-                               throw new IllegalArgumentException("No driver 
supplied");
+                               throw new IllegalArgumentException("No driver 
supplied.");
                        }
 
                        return format;

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java
new file mode 100644
index 0000000..d2fdef6
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCSinkFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.types.Row;
+
+class JDBCSinkFunction extends RichSinkFunction<Row> implements 
CheckpointedFunction {
+       final JDBCOutputFormat outputFormat;
+
+       JDBCSinkFunction(JDBCOutputFormat outputFormat) {
+               this.outputFormat = outputFormat;
+       }
+
+       @Override
+       public void invoke(Row value) throws Exception {
+               outputFormat.writeRecord(value);
+       }
+
+       @Override
+       public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+               outputFormat.flush();
+       }
+
+       @Override
+       public void initializeState(FunctionInitializationContext context) 
throws Exception {
+       }
+
+       @Override
+       public void open(Configuration parameters) throws Exception {
+               super.open(parameters);
+               RuntimeContext ctx = getRuntimeContext();
+               outputFormat.setRuntimeContext(ctx);
+               outputFormat.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks());
+       }
+
+       @Override
+       public void close() throws Exception {
+               outputFormat.close();
+               super.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java
new file mode 100644
index 0000000..c10631c
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtil.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+
+import java.sql.Types;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.BIG_DEC_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.BYTE_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.FLOAT_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.SHORT_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
+
+class JDBCTypeUtil {
+       private static final Map<TypeInformation<?>, Integer> TYPE_MAPPING;
+       private static final Map<Integer, String> SQL_TYPE_NAMES;
+
+       static {
+               HashMap<TypeInformation<?>, Integer> m = new HashMap<>();
+               m.put(STRING_TYPE_INFO, Types.VARCHAR);
+               m.put(BOOLEAN_TYPE_INFO, Types.BOOLEAN);
+               m.put(BYTE_TYPE_INFO, Types.TINYINT);
+               m.put(SHORT_TYPE_INFO, Types.SMALLINT);
+               m.put(INT_TYPE_INFO, Types.INTEGER);
+               m.put(LONG_TYPE_INFO, Types.BIGINT);
+               m.put(FLOAT_TYPE_INFO, Types.FLOAT);
+               m.put(DOUBLE_TYPE_INFO, Types.DOUBLE);
+               m.put(SqlTimeTypeInfo.DATE, Types.DATE);
+               m.put(SqlTimeTypeInfo.TIME, Types.TIME);
+               m.put(SqlTimeTypeInfo.TIMESTAMP, Types.TIMESTAMP);
+               m.put(BIG_DEC_TYPE_INFO, Types.DECIMAL);
+               m.put(BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.BINARY);
+               TYPE_MAPPING = Collections.unmodifiableMap(m);
+
+               HashMap<Integer, String> names = new HashMap<>();
+               names.put(Types.VARCHAR, "VARCHAR");
+               names.put(Types.BOOLEAN, "BOOLEAN");
+               names.put(Types.TINYINT, "TINYINT");
+               names.put(Types.SMALLINT, "SMALLINT");
+               names.put(Types.INTEGER, "INTEGER");
+               names.put(Types.BIGINT, "BIGINT");
+               names.put(Types.FLOAT, "FLOAT");
+               names.put(Types.DOUBLE, "DOUBLE");
+               names.put(Types.CHAR, "CHAR");
+               names.put(Types.DATE, "DATE");
+               names.put(Types.TIME, "TIME");
+               names.put(Types.TIMESTAMP, "TIMESTAMP");
+               names.put(Types.DECIMAL, "DECIMAL");
+               names.put(Types.BINARY, "BINARY");
+               SQL_TYPE_NAMES = Collections.unmodifiableMap(names);
+       }
+
+       private JDBCTypeUtil() {
+       }
+
+       static int typeInformationToSqlType(TypeInformation<?> type) {
+
+               if (TYPE_MAPPING.containsKey(type)) {
+                       return TYPE_MAPPING.get(type);
+               } else if (type instanceof ObjectArrayTypeInfo || type 
instanceof PrimitiveArrayTypeInfo) {
+                       return Types.ARRAY;
+               } else {
+                       throw new IllegalArgumentException("Unsupported type: " 
+ type);
+               }
+       }
+
+       static String getTypeName(int type) {
+               return SQL_TYPE_NAMES.get(type);
+       }
+
+       static String getTypeName(TypeInformation<?> type) {
+               return SQL_TYPE_NAMES.get(typeInformationToSqlType(type));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java
new file mode 100644
index 0000000..95305c8
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSinkTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for JDBCAppendTableSink.
+ */
+public class JDBCAppendTableSinkTest {
+       private static final String[] FIELD_NAMES = new String[]{"foo"};
+       private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[]{
+               BasicTypeInfo.STRING_TYPE_INFO
+       };
+       private static final RowTypeInfo ROW_TYPE = new 
RowTypeInfo(FIELD_TYPES, FIELD_NAMES);
+
+       @Test
+       public void testAppendTableSink() throws IOException {
+               JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
+                       .setDrivername("foo")
+                       .setDBUrl("bar")
+                       .setQuery("insert into %s (id) values (?)")
+                       .setParameterTypes(FIELD_TYPES)
+                       .build();
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Row> ds = 
env.fromCollection(Collections.singleton(Row.of("foo")), ROW_TYPE);
+               sink.emitDataStream(ds);
+
+               Collection<Integer> sinkIds = env.getStreamGraph().getSinkIDs();
+               assertEquals(1, sinkIds.size());
+               int sinkId = sinkIds.iterator().next();
+
+               StreamSink planSink = (StreamSink) 
env.getStreamGraph().getStreamNode(sinkId).getOperator();
+               assertTrue(planSink.getUserFunction() instanceof 
JDBCSinkFunction);
+
+               JDBCSinkFunction sinkFunction = (JDBCSinkFunction) 
planSink.getUserFunction();
+               assertSame(sink.getOutputFormat(), sinkFunction.outputFormat);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testTypeCompatibilityCheck() throws IOException {
+
+               JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
+                       .setDrivername("foo")
+                       .setDBUrl("bar")
+                       .setQuery("INSERT INTO foobar (id) VALUES (?)")
+                       .setParameterTypes(Types.LONG, Types.STRING, Types.INT)
+                       .build();
+
+               sink.configure(
+                       new String[] {"Hello"},
+                       new TypeInformation<?>[] {Types.STRING, Types.INT, 
Types.LONG});
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
index e6626a0..8582387 100644
--- 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.java.io.jdbc;
 import org.apache.flink.types.Row;
 
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -33,6 +32,9 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Types;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
 /**
  * Tests for the {@link JDBCOutputFormat}.
  */
@@ -170,13 +172,7 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
                jdbcOutputFormat.open(0, 1);
 
                for (JDBCTestBase.TestEntry entry : TEST_DATA) {
-                       Row row = new Row(5);
-                       row.setField(0, entry.id);
-                       row.setField(1, entry.title);
-                       row.setField(2, entry.author);
-                       row.setField(3, entry.price);
-                       row.setField(4, entry.qty);
-                       jdbcOutputFormat.writeRecord(row);
+                       jdbcOutputFormat.writeRecord(toRow(entry));
                }
 
                jdbcOutputFormat.close();
@@ -188,15 +184,52 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
                ) {
                        int recordCount = 0;
                        while (resultSet.next()) {
-                               Assert.assertEquals(TEST_DATA[recordCount].id, 
resultSet.getObject("id"));
-                               
Assert.assertEquals(TEST_DATA[recordCount].title, resultSet.getObject("title"));
-                               
Assert.assertEquals(TEST_DATA[recordCount].author, 
resultSet.getObject("author"));
-                               
Assert.assertEquals(TEST_DATA[recordCount].price, resultSet.getObject("price"));
-                               Assert.assertEquals(TEST_DATA[recordCount].qty, 
resultSet.getObject("qty"));
+                               assertEquals(TEST_DATA[recordCount].id, 
resultSet.getObject("id"));
+                               assertEquals(TEST_DATA[recordCount].title, 
resultSet.getObject("title"));
+                               assertEquals(TEST_DATA[recordCount].author, 
resultSet.getObject("author"));
+                               assertEquals(TEST_DATA[recordCount].price, 
resultSet.getObject("price"));
+                               assertEquals(TEST_DATA[recordCount].qty, 
resultSet.getObject("qty"));
 
                                recordCount++;
                        }
-                       Assert.assertEquals(TEST_DATA.length, recordCount);
+                       assertEquals(TEST_DATA.length, recordCount);
+               }
+       }
+
+       @Test
+       public void testFlush() throws SQLException, IOException {
+               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+                       .setDrivername(DRIVER_CLASS)
+                       .setDBUrl(DB_URL)
+                       .setQuery(String.format(INSERT_TEMPLATE, 
OUTPUT_TABLE_2))
+                       .setBatchInterval(3)
+                       .finish();
+               try (
+                       Connection dbConn = DriverManager.getConnection(DB_URL);
+                       PreparedStatement statement = 
dbConn.prepareStatement(JDBCTestBase.SELECT_ALL_NEWBOOKS_2)
+               ) {
+                       jdbcOutputFormat.open(0, 1);
+                       for (int i = 0; i < 2; ++i) {
+                               
jdbcOutputFormat.writeRecord(toRow(TEST_DATA[i]));
+                       }
+                       try (ResultSet resultSet = statement.executeQuery()) {
+                               assertFalse(resultSet.next());
+                       }
+                       jdbcOutputFormat.writeRecord(toRow(TEST_DATA[2]));
+                       try (ResultSet resultSet = statement.executeQuery()) {
+                               int recordCount = 0;
+                               while (resultSet.next()) {
+                                       assertEquals(TEST_DATA[recordCount].id, 
resultSet.getObject("id"));
+                                       
assertEquals(TEST_DATA[recordCount].title, resultSet.getObject("title"));
+                                       
assertEquals(TEST_DATA[recordCount].author, resultSet.getObject("author"));
+                                       
assertEquals(TEST_DATA[recordCount].price, resultSet.getObject("price"));
+                                       
assertEquals(TEST_DATA[recordCount].qty, resultSet.getObject("qty"));
+                                       recordCount++;
+                               }
+                               assertEquals(3, recordCount);
+                       }
+               } finally {
+                       jdbcOutputFormat.close();
                }
        }
 
@@ -212,4 +245,14 @@ public class JDBCOutputFormatTest extends JDBCTestBase {
                        conn.close();
                }
        }
+
+       private static Row toRow(TestEntry entry) {
+               Row row = new Row(5);
+               row.setField(0, entry.id);
+               row.setField(1, entry.title);
+               row.setField(2, entry.author);
+               row.setField(3, entry.price);
+               row.setField(4, entry.qty);
+               return row;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
index 7189393..1d41d37 100644
--- 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTestBase.java
@@ -39,8 +39,10 @@ public class JDBCTestBase {
        public static final String DB_URL = "jdbc:derby:memory:ebookshop";
        public static final String INPUT_TABLE = "books";
        public static final String OUTPUT_TABLE = "newbooks";
+       public static final String OUTPUT_TABLE_2 = "newbooks2";
        public static final String SELECT_ALL_BOOKS = "select * from " + 
INPUT_TABLE;
        public static final String SELECT_ALL_NEWBOOKS = "select * from " + 
OUTPUT_TABLE;
+       public static final String SELECT_ALL_NEWBOOKS_2 = "select * from " + 
OUTPUT_TABLE_2;
        public static final String SELECT_EMPTY = "select * from books WHERE 
QTY < 0";
        public static final String INSERT_TEMPLATE = "insert into %s (id, 
title, author, price, qty) values (?,?,?,?,?)";
        public static final String SELECT_ALL_BOOKS_SPLIT_BY_ID = 
SELECT_ALL_BOOKS + " WHERE id BETWEEN ? AND ?";
@@ -125,6 +127,7 @@ public class JDBCTestBase {
                try (Connection conn = DriverManager.getConnection(DB_URL + 
";create=true")) {
                        createTable(conn, JDBCTestBase.INPUT_TABLE);
                        createTable(conn, OUTPUT_TABLE);
+                       createTable(conn, OUTPUT_TABLE_2);
                        insertDataIntoInputTable(conn);
                }
        }
@@ -150,6 +153,7 @@ public class JDBCTestBase {
 
                        stat.executeUpdate("DROP TABLE " + INPUT_TABLE);
                        stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE);
+                       stat.executeUpdate("DROP TABLE " + OUTPUT_TABLE_2);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/43e5a81d/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtilTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtilTest.java
 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtilTest.java
new file mode 100644
index 0000000..790be78
--- /dev/null
+++ 
b/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTypeUtilTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+
+import org.junit.Test;
+
+import java.sql.Types;
+
+import static 
org.apache.flink.api.java.io.jdbc.JDBCTypeUtil.typeInformationToSqlType;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Testing the type conversions from Flink to SQL types.
+ */
+public class JDBCTypeUtilTest {
+
+       @Test
+       public void testTypeConversions() {
+               assertEquals(Types.INTEGER, 
typeInformationToSqlType(BasicTypeInfo.INT_TYPE_INFO));
+               testUnsupportedType(BasicTypeInfo.VOID_TYPE_INFO);
+               testUnsupportedType(new 
MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
+       }
+
+       private static void testUnsupportedType(TypeInformation<?> type) {
+               try {
+                       typeInformationToSqlType(type);
+                       fail();
+               } catch (IllegalArgumentException ignored) {
+               }
+       }
+}

Reply via email to