wuchong commented on a change in pull request #12036:
URL: https://github.com/apache/flink/pull/12036#discussion_r422091825



##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.connector.jdbc.JdbcInputFormat;
+import org.apache.flink.connector.jdbc.source.row.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
+import org.apache.flink.util.Preconditions;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+
+/**
+ * InputFormat to read data from a database and generate Rows.
+ * The InputFormat has to be configured using the supplied InputFormatBuilder.

Review comment:
       You can keep the original class Javadoc here. 

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.split;
+
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+import 
org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
+
+import java.io.Serializable;
+
+/**
+ * This splits generator actually does nothing but wrapping the query 
parameters
+ * computed by the user before creating the {@link JDBCInputFormat} instance.
+ *
+ * @deprecated Please use {@link JdbcGenericParameterValuesProvider}.
+ */
+@Deprecated
+public class GenericParameterValuesProvider extends 
JdbcGenericParameterValuesProvider implements ParameterValuesProvider{

Review comment:
       ```suggestion
   public class GenericParameterValuesProvider extends 
JdbcGenericParameterValuesProvider implements ParameterValuesProvider {
   ```

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -80,7 +80,7 @@ default String quoteIdentifier(String identifier) {
         * the use of select + update/insert, this performance is poor.
         */
        default Optional<String> getUpsertStatement(
-                       String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {
+               String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {

Review comment:
       indent

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.split;
+
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
+
+/**
+ * This interface is used by the {@link JDBCInputFormat} to compute the list 
of parallel query to run (i.e. splits).
+ * Each query will be parameterized using a row of the matrix provided by each 
{@link ParameterValuesProvider}
+ * implementation.
+ *
+ * @deprecated Please use {@link JdbcParameterValuesProvider}.
+ * Flink proposes class name start with "Jdbc" rather than "JDBC".

Review comment:
       remove this line.

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialects.java
##########
@@ -39,27 +39,27 @@
 /**
  * Default JDBC dialects.
  */
-public final class JDBCDialects {
+public class JdbcDialects {

Review comment:
       Why removes the `final`? 

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcBatchingOutputFormat.java
##########
@@ -40,10 +39,13 @@
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
-import static org.apache.flink.api.java.io.jdbc.JDBCUtils.setRecordToStatement;
+import static org.apache.flink.connector.jdbc.JdbcUtils.setRecordToStatement;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
-class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends 
JdbcBatchStatementExecutor<JdbcIn>> extends AbstractJdbcOutputFormat<In> {
+/**
+ * A JDBC outputFormat support batching records before writing records to 
database.
+ */
+public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends 
JdbcBatchStatementExecutor<JdbcIn>> extends AbstractJdbcOutputFormat<In> {

Review comment:
       The package-visible is on purpose, so that users can't extend this 
class. 

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcOutputFormat.java
##########
@@ -37,15 +36,16 @@
  * @see Row
  * @see DriverManager
  */
+
 /**
  * @deprecated use {@link JdbcBatchingOutputFormat}
  */
 @Deprecated
-public class JDBCOutputFormat extends AbstractJdbcOutputFormat<Row> {
+public class JdbcOutputFormat extends AbstractJdbcOutputFormat<Row> {

Review comment:
       Please remove this class. 

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSinkFunction.java
##########
@@ -31,9 +31,9 @@
  */
 @Deprecated
 class JdbcSinkFunction extends RichSinkFunction<Row> implements 
CheckpointedFunction {

Review comment:
       remove this file? I think nobody is using it, and 
`GenericJdbcSinkFunction` is not accessible for users.

##########
File path: 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/JdbcITCase.java
##########
@@ -35,26 +33,22 @@
 import java.util.Arrays;
 import java.util.List;
 
-import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.INPUT_TABLE;
-import static 
org.apache.flink.api.java.io.jdbc.JdbcTestFixture.INSERT_TEMPLATE;
-import static org.apache.flink.api.java.io.jdbc.JdbcTestFixture.TEST_DATA;

Review comment:
       I think keeping this static imports can make code more concise. 

##########
File path: flink-connectors/pom.xml
##########
@@ -36,7 +36,7 @@ under the License.
        <packaging>pom</packaging>
 
        <modules>
-               <module>flink-jdbc</module>
+               <module>flink-connector-jdbc</module>

Review comment:
       Put it after `hive` moedule in alphabetical order? 

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/source/row/converter/JdbcRowConverter.java
##########
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.api.java.io.jdbc.source.row.converter;
+package org.apache.flink.connector.jdbc.source.row.converter;

Review comment:
       Move to `org.apache.flink.connector.jdbc.source.converter` package? I 
think converter is always used for Row (or RowData in the future). 

##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialect.java
##########
@@ -31,7 +31,7 @@
 /**
  * Handle the SQL dialect of jdbc driver.
  */
-public interface JDBCDialect extends Serializable {
+public interface JdbcDialect extends Serializable {

Review comment:
       How about to add an `@Internal` annotation on this to tell this is not 
ready to be exposed to users.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to