dianfu commented on a change in pull request #13983:
URL: https://github.com/apache/flink/pull/13983#discussion_r531453695
##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -212,7 +213,7 @@ def slot_sharing_group(self, slot_sharing_group: str) ->
'DataStream':
self._j_data_stream.slotSharingGroup(slot_sharing_group)
return self
- def map(self, func: Union[Callable, MapFunction], output_type:
TypeInformation = None) \
+ def map(self, func: Union[Callable, MapFunction], output_type:
WrapperTypeInfo = None) \
Review comment:
ditto
##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -759,7 +787,7 @@ def __init__(self, j_keyed_stream, original_data_type_info,
origin_stream: DataS
self._original_data_type_info = original_data_type_info
self._origin_stream = origin_stream
- def map(self, func: Union[Callable, MapFunction], output_type:
TypeInformation = None) \
+ def map(self, func: Union[Callable, MapFunction], output_type:
WrapperTypeInfo = None) \
Review comment:
ditto
##########
File path:
flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java
##########
@@ -266,6 +278,92 @@ private static Object getPickledBytesFromJavaObject(Object
obj, LogicalType data
}
}
+ public static Object getPickledBytesFromJavaObject(Object obj,
TypeInformation<?> dataType) throws IOException {
+ Pickler pickler = new Pickler();
+ initialize();
+ if (obj == null) {
+ return new byte[0];
+ } else {
+ if (dataType instanceof SqlTimeTypeInfo) {
+ SqlTimeTypeInfo<?> sqlTimeTypeInfo =
SqlTimeTypeInfo.getInfoFor(dataType.getTypeClass());
+ if (sqlTimeTypeInfo == DATE) {
+ long time;
+ if (obj instanceof LocalDate) {
+ time = ((LocalDate)
(obj)).toEpochDay();
+ } else {
+ time = ((Date)
obj).toLocalDate().toEpochDay();
+ }
+ return pickler.dumps(time);
+ } else if (sqlTimeTypeInfo == TIME) {
+ long time;
+ if (obj instanceof LocalTime) {
+ time = ((LocalTime)
obj).toNanoOfDay();
+ } else {
+ time = ((Time)
obj).toLocalTime().toNanoOfDay();
+ }
+ time = time / 1000;
+ return pickler.dumps(time);
+ } else if (sqlTimeTypeInfo == TIMESTAMP) {
+ if (obj instanceof LocalDateTime) {
+ return
pickler.dumps(Timestamp.valueOf((LocalDateTime) obj));
+ } else {
+ return pickler.dumps(obj);
+ }
+ }
+ } else if (dataType instanceof RowTypeInfo) {
+ Row tmpRow = (Row) obj;
+ TypeInformation<?>[] tmpRowFieldTypes =
((RowTypeInfo) dataType).getFieldTypes();
+ List<Object> rowFieldBytes = new
ArrayList<>(tmpRow.getArity() + 1);
+ rowFieldBytes.add(new
byte[]{tmpRow.getKind().toByteValue()});
+ for (int i = 0; i < tmpRow.getArity(); i++) {
+
rowFieldBytes.add(getPickledBytesFromJavaObject(
+ tmpRow.getField(i),
+ tmpRowFieldTypes[i]));
+ }
+ return rowFieldBytes;
+ } else if (dataType instanceof MapTypeInfo) {
+ List<List<Object>> serializedMapKV = new
ArrayList<>(2);
+ MapTypeInfo<?, ?> mapType = (MapTypeInfo)
dataType;
+ TypeInformation<?> keyType =
mapType.getKeyTypeInfo();
+ TypeInformation<?> valueType =
mapType.getValueTypeInfo();
+ List<Object> keyBytesList = new ArrayList<>();
+ List<Object> valueBytesList = new ArrayList<>();
+ Map<Object, Object> mapObj = (Map) obj;
+ for (Map.Entry<?, ?> entry : mapObj.entrySet())
{
+
keyBytesList.add(getPickledBytesFromJavaObject(entry.getKey(), keyType));
+
valueBytesList.add(getPickledBytesFromJavaObject(entry.getValue(), valueType));
+ }
+ serializedMapKV.add(keyBytesList);
+ serializedMapKV.add(valueBytesList);
+ return pickler.dumps(serializedMapKV);
+ } else if (dataType instanceof BasicArrayTypeInfo) {
+ List<Object> serializedElements = new
ArrayList<>();
+ Object[] objects = (Object[]) obj;
+ BasicArrayTypeInfo<?, ?> arrayType =
(BasicArrayTypeInfo) dataType;
+ TypeInformation<?> elementType =
arrayType.getComponentInfo();
+ for (Object object : objects) {
+
serializedElements.add(getPickledBytesFromJavaObject(object, elementType));
+ }
+ return pickler.dumps(serializedElements);
+ } else if (dataType instanceof PrimitiveArrayTypeInfo) {
+ List<Object> serializedElements = new
ArrayList<>();
+ Object[] objects = (Object[]) obj;
+ PrimitiveArrayTypeInfo<?> arrayType =
(PrimitiveArrayTypeInfo) dataType;
+ TypeInformation<?> elementType =
arrayType.getComponentType();
+ for (Object object : objects) {
+
serializedElements.add(getPickledBytesFromJavaObject(object, elementType));
+ }
+ return pickler.dumps(serializedElements);
+ }
+ if (dataType instanceof BasicTypeInfo &&
+
BasicTypeInfo.getInfoFor(dataType.getTypeClass()) == FLOAT_TYPE_INFO) {
+ return pickler.dumps(String.valueOf(obj));
Review comment:
could we add some comments here on why we need to handle float specially?
##########
File path:
flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java
##########
@@ -266,6 +278,92 @@ private static Object getPickledBytesFromJavaObject(Object
obj, LogicalType data
}
}
+ public static Object getPickledBytesFromJavaObject(Object obj,
TypeInformation<?> dataType) throws IOException {
+ Pickler pickler = new Pickler();
+ initialize();
+ if (obj == null) {
+ return new byte[0];
+ } else {
+ if (dataType instanceof SqlTimeTypeInfo) {
+ SqlTimeTypeInfo<?> sqlTimeTypeInfo =
SqlTimeTypeInfo.getInfoFor(dataType.getTypeClass());
+ if (sqlTimeTypeInfo == DATE) {
+ long time;
+ if (obj instanceof LocalDate) {
+ time = ((LocalDate)
(obj)).toEpochDay();
+ } else {
+ time = ((Date)
obj).toLocalDate().toEpochDay();
+ }
+ return pickler.dumps(time);
+ } else if (sqlTimeTypeInfo == TIME) {
+ long time;
+ if (obj instanceof LocalTime) {
+ time = ((LocalTime)
obj).toNanoOfDay();
+ } else {
+ time = ((Time)
obj).toLocalTime().toNanoOfDay();
+ }
+ time = time / 1000;
+ return pickler.dumps(time);
+ } else if (sqlTimeTypeInfo == TIMESTAMP) {
+ if (obj instanceof LocalDateTime) {
+ return
pickler.dumps(Timestamp.valueOf((LocalDateTime) obj));
+ } else {
+ return pickler.dumps(obj);
+ }
+ }
+ } else if (dataType instanceof RowTypeInfo) {
+ Row tmpRow = (Row) obj;
+ TypeInformation<?>[] tmpRowFieldTypes =
((RowTypeInfo) dataType).getFieldTypes();
+ List<Object> rowFieldBytes = new
ArrayList<>(tmpRow.getArity() + 1);
+ rowFieldBytes.add(new
byte[]{tmpRow.getKind().toByteValue()});
+ for (int i = 0; i < tmpRow.getArity(); i++) {
+
rowFieldBytes.add(getPickledBytesFromJavaObject(
+ tmpRow.getField(i),
+ tmpRowFieldTypes[i]));
+ }
+ return rowFieldBytes;
+ } else if (dataType instanceof MapTypeInfo) {
Review comment:
Map type is still not supported in the Python DataStream API and so we
should remove this.
##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -123,7 +124,7 @@ def set_max_parallelism(self, max_parallelism: int) ->
'DataStream':
self._j_data_stream.setMaxParallelism(max_parallelism)
return self
- def get_type(self) -> TypeInformation:
+ def get_type(self) -> WrapperTypeInfo:
Review comment:
Why changed this? WrapperTypeInfo is not a public class and so I think
we should not expose it.
##########
File path: flink-python/pyflink/datastream/utils.py
##########
@@ -0,0 +1,77 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import ast
+import datetime
+import pickle
+
+from pyflink.common import typeinfo
+from pyflink.common.typeinfo import WrapperTypeInfo, RowTypeInfo,
TupleTypeInfo, Types, \
+ BasicArrayTypeInfo, PrimitiveArrayTypeInfo
+from pyflink.java_gateway import get_gateway
+
+
+def convert_to_python_obj(data, type_info):
+ if type_info == Types.PICKLED_BYTE_ARRAY():
+ return pickle.loads(data)
+ else:
+ gateway = get_gateway()
+ pickle_bytes = gateway.jvm.PythonBridgeUtils. \
+ getPickledBytesFromJavaObject(data, type_info.get_java_type_info())
+ if isinstance(type_info, RowTypeInfo) or isinstance(type_info,
TupleTypeInfo):
Review comment:
I have not found code to process for TupleTypeInfo in
PythonBridgeUtils.getPickledBytesFromJavaObject. Is it missing or not necessary?
##########
File path: flink-python/pyflink/datastream/data_stream.py
##########
@@ -597,6 +598,33 @@ def add_sink(self, sink_func: SinkFunction) ->
'DataStreamSink':
"""
return
DataStreamSink(self._j_data_stream.addSink(sink_func.get_java_function()))
+ def execute_and_collect(self, job_execution_name: str = None, limit: int =
None) \
+ -> Union['CloseableIterator', list]:
+ """
+ Triggers the distributed execution of the streaming dataflow and
returns an iterator over
+ the elements of the given DataStream.
+
+ The DataStream application is executed in the regular distributed
manner on the target
+ environment, and the events from the stream are polled back to this
application process and
Review comment:
```suggestion
environment, and the events from the stream are polled back to this
application process and
```
##########
File path:
flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java
##########
@@ -266,6 +278,92 @@ private static Object getPickledBytesFromJavaObject(Object
obj, LogicalType data
}
}
+ public static Object getPickledBytesFromJavaObject(Object obj,
TypeInformation<?> dataType) throws IOException {
+ Pickler pickler = new Pickler();
+ initialize();
+ if (obj == null) {
+ return new byte[0];
+ } else {
+ if (dataType instanceof SqlTimeTypeInfo) {
+ SqlTimeTypeInfo<?> sqlTimeTypeInfo =
SqlTimeTypeInfo.getInfoFor(dataType.getTypeClass());
+ if (sqlTimeTypeInfo == DATE) {
+ long time;
+ if (obj instanceof LocalDate) {
+ time = ((LocalDate)
(obj)).toEpochDay();
+ } else {
+ time = ((Date)
obj).toLocalDate().toEpochDay();
+ }
+ return pickler.dumps(time);
+ } else if (sqlTimeTypeInfo == TIME) {
+ long time;
+ if (obj instanceof LocalTime) {
+ time = ((LocalTime)
obj).toNanoOfDay();
+ } else {
+ time = ((Time)
obj).toLocalTime().toNanoOfDay();
+ }
+ time = time / 1000;
+ return pickler.dumps(time);
+ } else if (sqlTimeTypeInfo == TIMESTAMP) {
+ if (obj instanceof LocalDateTime) {
+ return
pickler.dumps(Timestamp.valueOf((LocalDateTime) obj));
+ } else {
+ return pickler.dumps(obj);
+ }
+ }
+ } else if (dataType instanceof RowTypeInfo) {
+ Row tmpRow = (Row) obj;
+ TypeInformation<?>[] tmpRowFieldTypes =
((RowTypeInfo) dataType).getFieldTypes();
+ List<Object> rowFieldBytes = new
ArrayList<>(tmpRow.getArity() + 1);
+ rowFieldBytes.add(new
byte[]{tmpRow.getKind().toByteValue()});
+ for (int i = 0; i < tmpRow.getArity(); i++) {
+
rowFieldBytes.add(getPickledBytesFromJavaObject(
+ tmpRow.getField(i),
+ tmpRowFieldTypes[i]));
+ }
+ return rowFieldBytes;
+ } else if (dataType instanceof MapTypeInfo) {
+ List<List<Object>> serializedMapKV = new
ArrayList<>(2);
+ MapTypeInfo<?, ?> mapType = (MapTypeInfo)
dataType;
+ TypeInformation<?> keyType =
mapType.getKeyTypeInfo();
+ TypeInformation<?> valueType =
mapType.getValueTypeInfo();
+ List<Object> keyBytesList = new ArrayList<>();
+ List<Object> valueBytesList = new ArrayList<>();
+ Map<Object, Object> mapObj = (Map) obj;
+ for (Map.Entry<?, ?> entry : mapObj.entrySet())
{
+
keyBytesList.add(getPickledBytesFromJavaObject(entry.getKey(), keyType));
+
valueBytesList.add(getPickledBytesFromJavaObject(entry.getValue(), valueType));
+ }
+ serializedMapKV.add(keyBytesList);
+ serializedMapKV.add(valueBytesList);
+ return pickler.dumps(serializedMapKV);
+ } else if (dataType instanceof BasicArrayTypeInfo) {
Review comment:
Could refactor a bit to remove the duplicate code of BasicArrayTypeInfo
and PrimitiveArrayTypeInfo
##########
File path:
flink-python/src/main/java/org/apache/flink/api/common/python/PythonBridgeUtils.java
##########
@@ -266,6 +278,92 @@ private static Object getPickledBytesFromJavaObject(Object
obj, LogicalType data
}
}
+ public static Object getPickledBytesFromJavaObject(Object obj,
TypeInformation<?> dataType) throws IOException {
+ Pickler pickler = new Pickler();
+ initialize();
+ if (obj == null) {
+ return new byte[0];
+ } else {
+ if (dataType instanceof SqlTimeTypeInfo) {
+ SqlTimeTypeInfo<?> sqlTimeTypeInfo =
SqlTimeTypeInfo.getInfoFor(dataType.getTypeClass());
+ if (sqlTimeTypeInfo == DATE) {
+ long time;
+ if (obj instanceof LocalDate) {
+ time = ((LocalDate)
(obj)).toEpochDay();
+ } else {
+ time = ((Date)
obj).toLocalDate().toEpochDay();
Review comment:
Two thoughts here:
- Isn't it always be Date?
- Why not just pickle the Date object? I assume pickle could handle the
Date/Time/Timestamp objects properly.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]