dianfu commented on code in PR #20263:
URL: https://github.com/apache/flink/pull/20263#discussion_r923115751


##########
docs/layouts/shortcodes/py_download_link.html:
##########
@@ -19,12 +19,68 @@
 Generates the PyFlink download connector page.
 */}}
 {{ $name := .Get 0 }}
-{{ $text := .Get 1 }}
 {{ $connectors := .Site.Data.sql_connectors }}
 {{ $connector := index $connectors $name }}
 
-<p>In order to use the {{ $connector.name }} {{ $connector.category }} in 
PyFlink jobs, the following
-dependencies are required: <a href="{{- partial "docs/interpolate" 
$connector.sql_url -}}">{{ $text }}</a>.
+<p>
+{{ if eq $.Site.Language.Lang "en" }}
+In order to use the {{ $connector.name }} {{ $connector.category }} in PyFlink 
jobs, the following
+dependencies are required:
+{{ else if eq $.Site.Language.Lang "zh" }}
+为了在 PyFlink 作业中使用 {{ $connector.name }} {{ $connector.category }} ,需要添加下列依赖:
+{{ end }}
+{{ if eq $connector.versions nil }}
+<table>
+    <thead>
+    <th style="text-align:left">SQL Client JAR</th>

Review Comment:
   ```suggestion
       <th style="text-align:left">PyFlink JAR</th>
   ```



##########
flink-python/pyflink/common/types.py:
##########
@@ -277,3 +290,33 @@ def __iter__(self):
 
     def __len__(self):
         return len(self._values)
+
+    def to_java_row(self):

Review Comment:
   What about convert it to a utility method and move it into java_utils?



##########
docs/layouts/shortcodes/py_download_link.html:
##########
@@ -19,12 +19,68 @@
 Generates the PyFlink download connector page.
 */}}
 {{ $name := .Get 0 }}
-{{ $text := .Get 1 }}
 {{ $connectors := .Site.Data.sql_connectors }}
 {{ $connector := index $connectors $name }}
 
-<p>In order to use the {{ $connector.name }} {{ $connector.category }} in 
PyFlink jobs, the following
-dependencies are required: <a href="{{- partial "docs/interpolate" 
$connector.sql_url -}}">{{ $text }}</a>.
+<p>
+{{ if eq $.Site.Language.Lang "en" }}
+In order to use the {{ $connector.name }} {{ $connector.category }} in PyFlink 
jobs, the following
+dependencies are required:
+{{ else if eq $.Site.Language.Lang "zh" }}
+为了在 PyFlink 作业中使用 {{ $connector.name }} {{ $connector.category }} ,需要添加下列依赖:
+{{ end }}
+{{ if eq $connector.versions nil }}
+<table>
+    <thead>
+    <th style="text-align:left">SQL Client JAR</th>
+    </thead>
+    <tbody>
+    <tr>
+        {{ if eq $connector.builtin true }}
+        <td style="text-align: left">Built-in</td>
+        {{ else if $.Site.Params.IsStable }}
+        {{ if eq $connector.sql_url nil }}
+        <td style="text-align:left">There is not sql jar available yet.</td>

Review Comment:
   ```suggestion
           <td style="text-align:left">There is no sql jar available yet.</td>
   ```



##########
flink-python/pyflink/common/types.py:
##########
@@ -39,6 +41,17 @@ def __str__(self):
         else:
             return '-D'
 
+    def to_j_row_kind(self):
+        JRowKind = get_gateway().jvm.org.apache.flink.types.RowKind
+        if self.value == RowKind.INSERT.value:
+            return JRowKind.INSERT
+        elif self.value == RowKind.UPDATE_BEFORE.value:
+            return JRowKind.UPDATE_BEFORE
+        elif self.value == RowKind.UPDATE_AFTER.value:
+            return JRowKind.UPDATE_AFTER
+        else:
+            return JRowKind.DELETE

Review Comment:
   Could be simplified as following:
   ```
   JRowKind = get_gateway().jvm.org.apache.flink.types.RowKind
   return getattr(JRowKind, self.name)
   ```



##########
flink-python/pyflink/common/serialization.py:
##########
@@ -38,6 +40,10 @@ class SerializationSchema(object):
     def __init__(self, j_serialization_schema=None):
         self._j_serialization_schema = j_serialization_schema
 
+    @abstractmethod
+    def require_row_type(self) -> bool:

Review Comment:
   Seems a little wired, what about removing this method?



##########
docs/content.zh/docs/connectors/datastream/formats/csv.md:
##########
@@ -38,6 +38,8 @@ To use the CSV format you need to add the Flink CSV 
dependency to your project:
 </dependency>
 ```
 
+{{< py_download_link "csv" >}}

Review Comment:
   csv is built-in supported and so I think this is unnecessary.



##########
flink-python/pyflink/datastream/connectors/base.py:
##########
@@ -52,6 +53,24 @@ def __init__(self, sink: Union[str, JavaObject]):
         super(Sink, self).__init__(sink)
 
 
+class TransformAppender(ABC):
+
+    @abstractmethod
+    def apply(self, ds):
+        pass
+
+
+class PreTransformWrapper(ABC):
+
+    @abstractmethod
+    def need_pre_transform(self) -> bool:
+        pass
+
+    @abstractmethod
+    def get_pre_transform(self) -> 'TransformAppender':

Review Comment:
   apply(self, ds) -> 'ds'?



##########
flink-python/pyflink/datastream/connectors/base.py:
##########
@@ -52,6 +53,24 @@ def __init__(self, sink: Union[str, JavaObject]):
         super(Sink, self).__init__(sink)
 
 
+class TransformAppender(ABC):
+
+    @abstractmethod
+    def apply(self, ds):
+        pass
+
+
+class PreTransformWrapper(ABC):

Review Comment:
   Rename it to SupportsPreprocessing?



##########
flink-python/pyflink/datastream/__init__.py:
##########
@@ -166,6 +166,10 @@
       A streaming data source that pulls a parallel data stream from Apache 
Kafka.
     - :class:`connectors.FlinkKafkaProducer`:
       A streaming data sink to produce data into a Kafka topic.
+    - :class:`connectors.KafkaSource`:
+      The new API to read data in parallel from Apache Kafka.
+    - :class:`connectors.FlinkKafkaProducer`:

Review Comment:
   Duplicate with line 167?



##########
flink-python/pyflink/datastream/connectors/base.py:
##########
@@ -52,6 +53,24 @@ def __init__(self, sink: Union[str, JavaObject]):
         super(Sink, self).__init__(sink)
 
 
+class TransformAppender(ABC):
+
+    @abstractmethod
+    def apply(self, ds):
+        pass
+
+
+class PreTransformWrapper(ABC):
+
+    @abstractmethod
+    def need_pre_transform(self) -> bool:

Review Comment:
   remove this method?



##########
flink-python/src/main/java/org/apache/flink/python/util/PythonConnectorUtils.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.util;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.function.Function;
+
+/** Utility class for using DataStream connectors in Python. */
+public class PythonConnectorUtils {
+
+    /**
+     * Creates a selector that returns the first column of a row, and cast it 
to {@code clazz}.
+     * {@code T} should be a sub interface of {@link Function}, which accepts 
a {@link Row}.
+     *
+     * @param clazz The desired selector class to cast to, e.g. 
TopicSelector.class for Kafka.
+     * @param <T> An interface
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T createFirstColumnSelector(Class<T> clazz) {
+        return (T)
+                Proxy.newProxyInstance(
+                        clazz.getClassLoader(),
+                        new Class[] {clazz},
+                        new FirstColumnSelectorInvocationHandler());
+    }
+
+    /** The serializable {@link InvocationHandler} as the proxy for first 
column selector. */
+    public static class FirstColumnSelectorInvocationHandler
+            implements InvocationHandler, Serializable {
+
+        @Override
+        public Object invoke(Object proxy, Method method, Object[] args) 
throws Throwable {
+            Preconditions.checkArgument(method.getName().equals("apply"));
+            Preconditions.checkArgument(args.length == 1);
+            Preconditions.checkArgument(args[0] instanceof Row);
+            Row row = (Row) args[0];
+            Preconditions.checkArgument(row.getArity() >= 1);
+            return row.getField(0);
+        }
+    }
+
+    /**
+     * A {@link SerializationSchema} for {@link Row} that only serialize the 
second column using a
+     * wrapped {@link SerializationSchema} for {@link T}.
+     *
+     * @param <T> The actual data type wrapped in the Row.
+     */
+    public static class SecondColumnSerializationSchema<T> implements 
SerializationSchema<Row> {
+

Review Comment:
   Add `private static final long serialVersionUID = 1L`;



##########
flink-python/src/main/java/org/apache/flink/python/util/PythonConnectorUtils.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.util;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.function.Function;
+
+/** Utility class for using DataStream connectors in Python. */
+public class PythonConnectorUtils {
+
+    /**
+     * Creates a selector that returns the first column of a row, and cast it 
to {@code clazz}.
+     * {@code T} should be a sub interface of {@link Function}, which accepts 
a {@link Row}.
+     *
+     * @param clazz The desired selector class to cast to, e.g. 
TopicSelector.class for Kafka.
+     * @param <T> An interface
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T createFirstColumnSelector(Class<T> clazz) {

Review Comment:
   ```suggestion
       public static <T> T createFirstColumnTopicSelector(Class<T> clazz) {
   ```



##########
flink-python/src/main/java/org/apache/flink/python/util/PythonConnectorUtils.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.python.util;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.function.Function;
+
+/** Utility class for using DataStream connectors in Python. */
+public class PythonConnectorUtils {
+
+    /**
+     * Creates a selector that returns the first column of a row, and cast it 
to {@code clazz}.
+     * {@code T} should be a sub interface of {@link Function}, which accepts 
a {@link Row}.
+     *
+     * @param clazz The desired selector class to cast to, e.g. 
TopicSelector.class for Kafka.
+     * @param <T> An interface
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T createFirstColumnSelector(Class<T> clazz) {
+        return (T)
+                Proxy.newProxyInstance(
+                        clazz.getClassLoader(),
+                        new Class[] {clazz},
+                        new FirstColumnSelectorInvocationHandler());
+    }
+
+    /** The serializable {@link InvocationHandler} as the proxy for first 
column selector. */
+    public static class FirstColumnSelectorInvocationHandler

Review Comment:
   ```suggestion
       public static class FirstColumnTopicSelectorInvocationHandler
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to