This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9f113e4 Revert "[FLINK-17118][python] Add Cython support for
primitive data types (#11718)"
9f113e4 is described below
commit 9f113e4706f4110142c8aad74912e21bb995e497
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri Apr 17 16:25:06 2020 +0200
Revert "[FLINK-17118][python] Add Cython support for primitive data types
(#11718)"
This reverts commit 1a5b35b1e1ea79a233cacc88e4574f446aba52ae.
---
.gitignore | 2 -
flink-python/MANIFEST.in | 2 -
flink-python/pyflink/fn_execution/coder_impl.py | 2 +-
flink-python/pyflink/fn_execution/coders.py | 2 +-
.../pyflink/fn_execution/fast_coder_impl.pxd | 192 -------
.../pyflink/fn_execution/fast_coder_impl.pyx | 553 ---------------------
.../{test_coders.py => test_coders_common.py} | 9 -
.../pyflink/fn_execution/tests/test_fast_coders.py | 141 ------
flink-python/setup.py | 28 +-
flink-python/tox.ini | 8 +-
10 files changed, 5 insertions(+), 934 deletions(-)
diff --git a/.gitignore b/.gitignore
index 886d61b..e89f622 100644
--- a/.gitignore
+++ b/.gitignore
@@ -36,8 +36,6 @@ flink-python/dev/.conda/
flink-python/dev/log/
flink-python/dev/.stage.txt
flink-python/.eggs/
-flink-python/**/*.c
-flink-python/**/*.so
atlassian-ide-plugin.xml
out/
/docs/api
diff --git a/flink-python/MANIFEST.in b/flink-python/MANIFEST.in
index edef99d..a8e2d61 100644
--- a/flink-python/MANIFEST.in
+++ b/flink-python/MANIFEST.in
@@ -30,5 +30,3 @@ include pyflink/LICENSE
include pyflink/NOTICE
include pyflink/README.txt
recursive-exclude deps/opt/python *
-recursive-include pyflink/fn_execution *.pxd
-recursive-include pyflink/fn_execution *.pyx
diff --git a/flink-python/pyflink/fn_execution/coder_impl.py
b/flink-python/pyflink/fn_execution/coder_impl.py
index d4aac4d..743ce7b 100644
--- a/flink-python/pyflink/fn_execution/coder_impl.py
+++ b/flink-python/pyflink/fn_execution/coder_impl.py
@@ -237,7 +237,7 @@ class TinyIntCoderImpl(StreamCoderImpl):
return struct.unpack('b', in_stream.read(1))[0]
-class SmallIntCoderImpl(StreamCoderImpl):
+class SmallIntImpl(StreamCoderImpl):
def encode_to_stream(self, value, out_stream, nested):
out_stream.write(struct.pack('>h', value))
diff --git a/flink-python/pyflink/fn_execution/coders.py
b/flink-python/pyflink/fn_execution/coders.py
index 4d9bfda..d2be4fd 100644
--- a/flink-python/pyflink/fn_execution/coders.py
+++ b/flink-python/pyflink/fn_execution/coders.py
@@ -262,7 +262,7 @@ class SmallIntCoder(DeterministicCoder):
"""
def _create_impl(self):
- return coder_impl.SmallIntCoderImpl()
+ return coder_impl.SmallIntImpl()
def to_type_hint(self):
return int
diff --git a/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
b/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
deleted file mode 100644
index 2156247..0000000
--- a/flink-python/pyflink/fn_execution/fast_coder_impl.pxd
+++ /dev/null
@@ -1,192 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-# cython: language_level=3
-
-cimport libc.stdint
-
-from apache_beam.coders.coder_impl cimport StreamCoderImpl, OutputStream,
InputStream
-
-# InputStreamAndFunctionWrapper wraps the user-defined function
-# and input_stream_wrapper in operations
-cdef class InputStreamAndFunctionWrapper:
- # user-defined function
- cdef readonly object func
- cdef InputStreamWrapper input_stream_wrapper
-
-# InputStreamWrapper wraps input_stream and related infos used to decode data
-cdef class InputStreamWrapper:
- cdef InputStream input_stream
- cdef list input_field_coders
- cdef TypeName*input_field_type
- cdef CoderType*input_coder_type
- cdef libc.stdint.int32_t input_field_count
- cdef libc.stdint.int32_t input_leading_complete_bytes_num
- cdef libc.stdint.int32_t input_remaining_bits_num
- cdef size_t input_buffer_size
-
-cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
- cdef readonly StreamCoderImpl _value_coder
-
-cdef class FlattenRowCoderImpl(StreamCoderImpl):
- # the input field coders and related args used to decode input_stream data
- cdef list _input_field_coders
- cdef TypeName*_input_field_type
- cdef CoderType*_input_coder_type
- cdef libc.stdint.int32_t _input_field_count
- cdef libc.stdint.int32_t _input_leading_complete_bytes_num
- cdef libc.stdint.int32_t _input_remaining_bits_num
-
- # the output field coders and related args used to encode data to
output_stream
- cdef readonly list _output_field_coders
- cdef TypeName*_output_field_type
- cdef CoderType*_output_coder_type
- cdef libc.stdint.int32_t _output_field_count
- cdef libc.stdint.int32_t _output_leading_complete_bytes_num
- cdef libc.stdint.int32_t _output_remaining_bits_num
-
- cdef bint*_null_mask
- cdef unsigned char*_null_byte_search_table
-
- # the char pointer used to store encoded data of output_stream
- cdef char*_output_data
- cdef size_t _output_buffer_size
- cdef size_t _output_pos
-
- # the tmp char pointer used to store encoded data of every row
- cdef char*_tmp_output_data
- cdef size_t _tmp_output_buffer_size
- cdef size_t _tmp_output_pos
-
- # the char pointer used to map the decoded data of input_stream
- cdef char*_input_data
- cdef size_t _input_pos
- cdef size_t _input_buffer_size
-
- # used to store the result of Python user-defined function
- cdef list row
-
- # the Python user-defined function
- cdef object func
-
- # initial attribute
- cdef void _init_attribute(self)
-
- # wrap input_stream
- cdef InputStreamWrapper _wrap_input_stream(self, InputStream input_stream,
size_t size)
-
- cdef void _write_null_mask(self, value, libc.stdint.int32_t
leading_complete_bytes_num,
- libc.stdint.int32_t remaining_bits_num)
- cdef void _read_null_mask(self, bint*null_mask, libc.stdint.int32_t
leading_complete_bytes_num,
- libc.stdint.int32_t remaining_bits_num)
-
- cdef void _prepare_encode(self, InputStreamAndFunctionWrapper
input_stream_and_function_wrapper,
- OutputStream out_stream)
-
- cdef void _maybe_flush(self, OutputStream out_stream)
- # Because output_buffer will be reallocated during encoding data, we need
to remap output_buffer
- # to the data pointer of output_stream
- cdef void _map_output_data_to_output_stream(self, OutputStream out_stream)
- cdef void _copy_to_output_buffer(self)
-
- # encode data to output_stream
- cdef void _encode_one_row(self, value)
- cdef void _encode_field_simple(self, TypeName field_type, item)
- cdef void _extend(self, size_t missing)
- cdef void _encode_byte(self, unsigned char val)
- cdef void _encode_smallint(self, libc.stdint.int16_t v)
- cdef void _encode_int(self, libc.stdint.int32_t v)
- cdef void _encode_bigint(self, libc.stdint.int64_t v)
- cdef void _encode_float(self, float v)
- cdef void _encode_double(self, double v)
- cdef void _encode_bytes(self, char*b)
-
- # decode data from input_stream
- cdef void _decode_next_row(self)
- cdef object _decode_field_simple(self, TypeName field_type)
- cdef unsigned char _decode_byte(self) except? -1
- cdef libc.stdint.int16_t _decode_smallint(self) except? -1
- cdef libc.stdint.int32_t _decode_int(self) except? -1
- cdef libc.stdint.int64_t _decode_bigint(self) except? -1
- cdef float _decode_float(self) except? -1
- cdef double _decode_double(self) except? -1
- cdef bytes _decode_bytes(self)
-
-cdef class TableFunctionRowCoderImpl(FlattenRowCoderImpl):
- cdef void _encode_end_message(self)
-
-cdef enum CoderType:
- UNDEFINED = -1
- SIMPLE = 0
- COMPLEX = 1
-
-cdef enum TypeName:
- NONE = -1
- ROW = 0
- TINYINT = 1
- SMALLINT = 2
- INT = 3
- BIGINT = 4
- DECIMAL = 5
- FLOAT = 6
- DOUBLE = 7
- DATE = 8
- TIME = 9
- TIMESTAMP = 10
- BOOLEAN = 11
- BINARY = 12
- CHAR = 13
- ARRAY = 14
- MAP = 15
- LOCAL_ZONED_TIMESTAMP = 16
-
-cdef class BaseCoder:
- cpdef CoderType coder_type(self)
- cpdef TypeName type_name(self)
-
-cdef class TinyIntCoderImpl(BaseCoder):
- pass
-
-cdef class SmallIntCoderImpl(BaseCoder):
- pass
-
-cdef class IntCoderImpl(BaseCoder):
- pass
-
-cdef class BigIntCoderImpl(BaseCoder):
- pass
-
-cdef class BooleanCoderImpl(BaseCoder):
- pass
-
-cdef class FloatCoderImpl(BaseCoder):
- pass
-
-cdef class DoubleCoderImpl(BaseCoder):
- pass
-
-cdef class BinaryCoderImpl(BaseCoder):
- pass
-
-cdef class CharCoderImpl(BaseCoder):
- pass
-
-cdef class DateCoderImpl(BaseCoder):
- pass
-
-cdef class TimeCoderImpl(BaseCoder):
- pass
diff --git a/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
b/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
deleted file mode 100644
index bfc08202..0000000
--- a/flink-python/pyflink/fn_execution/fast_coder_impl.pyx
+++ /dev/null
@@ -1,553 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-# cython: language_level = 3
-# cython: infer_types = True
-# cython: profile=True
-# cython: boundscheck=False, wraparound=False, initializedcheck=False,
cdivision=True
-
-cimport libc.stdlib
-from libc.string cimport strlen
-
-import datetime
-
-cdef class InputStreamAndFunctionWrapper:
- def __cinit__(self, func, input_stream_wrapper):
- self.func = func
- self.input_stream_wrapper = input_stream_wrapper
-
-cdef class PassThroughLengthPrefixCoderImpl(StreamCoderImpl):
- def __cinit__(self, value_coder):
- self._value_coder = value_coder
-
- cpdef encode_to_stream(self, value, OutputStream out_stream, bint nested):
- self._value_coder.encode_to_stream(value, out_stream, nested)
-
- cpdef decode_from_stream(self, InputStream in_stream, bint nested):
- return self._value_coder.decode_from_stream(in_stream, nested)
-
- cpdef get_estimated_size_and_observables(self, value, bint nested=False):
- return 0, []
-
-cdef class TableFunctionRowCoderImpl(FlattenRowCoderImpl):
- def __init__(self, flatten_row_coder):
- super(TableFunctionRowCoderImpl,
self).__init__(flatten_row_coder._output_field_coders)
-
- cpdef encode_to_stream(self, input_stream_and_function_wrapper,
OutputStream out_stream,
- bint nested):
- self._prepare_encode(input_stream_and_function_wrapper, out_stream)
- while self._input_buffer_size > self._input_pos:
- self._decode_next_row()
- result = self.func(self.row)
- if result:
- for value in result:
- if self._output_field_count == 1:
- value = (value,)
- self._encode_one_row(value)
- self._maybe_flush(out_stream)
- self._encode_end_message()
-
- self._map_output_data_to_output_stream(out_stream)
-
- # write 0x00 as end message
- cdef void _encode_end_message(self):
- if self._output_buffer_size < self._output_pos + 2:
- self._extend(2)
- self._output_data[self._output_pos] = 0x01
- self._output_data[self._output_pos + 1] = 0x00
- self._output_pos += 2
-
-cdef class FlattenRowCoderImpl(StreamCoderImpl):
- def __init__(self, field_coders):
- self._output_field_coders = field_coders
- self._output_field_count = len(self._output_field_coders)
- self._output_field_type = <TypeName*> libc.stdlib.malloc(
- self._output_field_count * sizeof(TypeName))
- self._output_coder_type = <CoderType*> libc.stdlib.malloc(
- self._output_field_count * sizeof(CoderType))
- self._output_leading_complete_bytes_num = self._output_field_count // 8
- self._output_remaining_bits_num = self._output_field_count % 8
- self._tmp_output_buffer_size = 1024
- self._tmp_output_pos = 0
- self._tmp_output_data = <char*>
libc.stdlib.malloc(self._tmp_output_buffer_size)
- self._null_byte_search_table = <unsigned char*> libc.stdlib.malloc(
- 8 * sizeof(unsigned char))
- self._init_attribute()
-
- cpdef decode_from_stream(self, InputStream in_stream, bint nested):
- cdef InputStreamWrapper input_stream_wrapper
- input_stream_wrapper = self._wrap_input_stream(in_stream,
in_stream.size())
- return input_stream_wrapper
-
- cpdef encode_to_stream(self, input_stream_and_function_wrapper,
OutputStream out_stream,
- bint nested):
- cdef list result
- self._prepare_encode(input_stream_and_function_wrapper, out_stream)
- while self._input_buffer_size > self._input_pos:
- self._decode_next_row()
- result = self.func(self.row)
- self._encode_one_row(result)
- self._maybe_flush(out_stream)
- self._map_output_data_to_output_stream(out_stream)
-
- cdef void _init_attribute(self):
- self._null_byte_search_table[0] = 0x80
- self._null_byte_search_table[1] = 0x40
- self._null_byte_search_table[2] = 0x20
- self._null_byte_search_table[3] = 0x10
- self._null_byte_search_table[4] = 0x08
- self._null_byte_search_table[5] = 0x04
- self._null_byte_search_table[6] = 0x02
- self._null_byte_search_table[7] = 0x01
- for i in range(self._output_field_count):
- self._output_field_type[i] =
self._output_field_coders[i].type_name()
- self._output_coder_type[i] =
self._output_field_coders[i].coder_type()
-
- cdef InputStreamWrapper _wrap_input_stream(self, InputStream input_stream,
size_t size):
- # wrappers the input field coders and input_stream together
- # so that it can be transposed to operations
- cdef InputStreamWrapper input_stream_wrapper
- input_stream_wrapper = InputStreamWrapper()
- input_stream_wrapper.input_stream = input_stream
- input_stream_wrapper.input_field_coders = self._output_field_coders
- input_stream_wrapper.input_remaining_bits_num =
self._output_remaining_bits_num
- input_stream_wrapper.input_leading_complete_bytes_num = \
- self._output_leading_complete_bytes_num
- input_stream_wrapper.input_field_count = self._output_field_count
- input_stream_wrapper.input_field_type = self._output_field_type
- input_stream_wrapper.input_coder_type = self._output_coder_type
- input_stream_wrapper.input_stream.pos = size
- input_stream_wrapper.input_buffer_size = size
- return input_stream_wrapper
-
- cdef void _encode_one_row(self, value):
- cdef libc.stdint.int32_t i
- self._write_null_mask(value, self._output_leading_complete_bytes_num,
- self._output_remaining_bits_num)
- for i in range(self._output_field_count):
- item = value[i]
- if item is not None:
- if self._output_coder_type[i] == SIMPLE:
- self._encode_field_simple(self._output_field_type[i], item)
-
- self._copy_to_output_buffer()
-
- cdef void _read_null_mask(self, bint*null_mask,
- libc.stdint.int32_t
input_leading_complete_bytes_num,
- libc.stdint.int32_t input_remaining_bits_num):
- cdef libc.stdint.int32_t field_pos, i
- cdef unsigned char b
- field_pos = 0
- for _ in range(input_leading_complete_bytes_num):
- b = self._input_data[self._input_pos]
- self._input_pos += 1
- for i in range(8):
- null_mask[field_pos] = (b & self._null_byte_search_table[i]) > 0
- field_pos += 1
-
- if input_remaining_bits_num:
- b = self._input_data[self._input_pos]
- self._input_pos += 1
- for i in range(input_remaining_bits_num):
- null_mask[field_pos] = (b & self._null_byte_search_table[i]) > 0
- field_pos += 1
-
- cdef void _decode_next_row(self):
- cdef libc.stdint.int32_t i
- # skip prefix variable int length
- while self._input_data[self._input_pos] & 0x80:
- self._input_pos += 1
- self._input_pos += 1
- self._read_null_mask(self._null_mask,
self._input_leading_complete_bytes_num,
- self._input_remaining_bits_num)
- for i in range(self._input_field_count):
- if self._null_mask[i]:
- self.row[i] = None
- else:
- if self._input_coder_type[i] == SIMPLE:
- self.row[i] =
self._decode_field_simple(self._input_field_type[i])
-
- cdef object _decode_field_simple(self, TypeName field_type):
- cdef libc.stdint.int32_t value, minutes, seconds, hours
- cdef libc.stdint.int64_t milliseconds
- if field_type == TINYINT:
- # tinyint
- return self._decode_byte()
- elif field_type == SMALLINT:
- # smallint
- return self._decode_smallint()
- elif field_type == INT:
- # int
- return self._decode_int()
- elif field_type == BIGINT:
- # bigint
- return self._decode_bigint()
- elif field_type == BOOLEAN:
- # boolean
- return not not self._decode_byte()
- elif field_type == FLOAT:
- # float
- return self._decode_float()
- elif field_type == DOUBLE:
- # double
- return self._decode_double()
- elif field_type == BINARY:
- # bytes
- return self._decode_bytes()
- elif field_type == CHAR:
- # str
- return self._decode_bytes().decode("utf-8")
- elif field_type == DATE:
- # Date
- # EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal()
- # The value of EPOCH_ORDINAL is 719163
- return datetime.date.fromordinal(self._decode_int() + 719163)
- elif field_type == TIME:
- # Time
- value = self._decode_int()
- seconds = value // 1000
- milliseconds = value % 1000
- minutes = seconds // 60
- seconds %= 60
- hours = minutes // 60
- minutes %= 60
- return datetime.time(hours, minutes, seconds, milliseconds * 1000)
-
- cdef unsigned char _decode_byte(self) except? -1:
- self._input_pos += 1
- return <unsigned char> self._input_data[self._input_pos - 1]
-
- cdef libc.stdint.int16_t _decode_smallint(self) except? -1:
- self._input_pos += 2
- return (<unsigned char> self._input_data[self._input_pos - 1]
- | <libc.stdint.uint32_t> <unsigned char>
self._input_data[self._input_pos - 2] << 8)
-
- cdef libc.stdint.int32_t _decode_int(self) except? -1:
- self._input_pos += 4
- return (<unsigned char> self._input_data[self._input_pos - 1]
- | <libc.stdint.uint32_t> <unsigned char>
self._input_data[self._input_pos - 2] << 8
- | <libc.stdint.uint32_t> <unsigned char>
self._input_data[self._input_pos - 3] << 16
- | <libc.stdint.uint32_t> <unsigned char> self._input_data[
- self._input_pos - 4] << 24)
-
- cdef libc.stdint.int64_t _decode_bigint(self) except? -1:
- self._input_pos += 8
- return (<unsigned char> self._input_data[self._input_pos - 1]
- | <libc.stdint.uint64_t> <unsigned char>
self._input_data[self._input_pos - 2] << 8
- | <libc.stdint.uint64_t> <unsigned char>
self._input_data[self._input_pos - 3] << 16
- | <libc.stdint.uint64_t> <unsigned char>
self._input_data[self._input_pos - 4] << 24
- | <libc.stdint.uint64_t> <unsigned char>
self._input_data[self._input_pos - 5] << 32
- | <libc.stdint.uint64_t> <unsigned char>
self._input_data[self._input_pos - 6] << 40
- | <libc.stdint.uint64_t> <unsigned char>
self._input_data[self._input_pos - 7] << 48
- | <libc.stdint.uint64_t> <unsigned char> self._input_data[
- self._input_pos - 8] << 56)
-
- cdef float _decode_float(self) except? -1:
- cdef libc.stdint.int32_t as_long = self._decode_int()
- return (<float*> <char*> &as_long)[0]
-
- cdef double _decode_double(self) except? -1:
- cdef libc.stdint.int64_t as_long = self._decode_bigint()
- return (<double*> <char*> &as_long)[0]
-
- cdef bytes _decode_bytes(self):
- cdef libc.stdint.int32_t size = self._decode_int()
- self._input_pos += size
- return self._input_data[self._input_pos - size: self._input_pos]
-
- cdef void _prepare_encode(self, InputStreamAndFunctionWrapper
input_stream_and_function_wrapper,
- OutputStream out_stream):
- cdef InputStreamWrapper input_stream_wrapper
- # get the data pointer of output_stream
- self._output_data = out_stream.data
- self._output_pos = out_stream.pos
- self._output_buffer_size = out_stream.buffer_size
- self._tmp_output_pos = 0
-
- input_stream_wrapper =
input_stream_and_function_wrapper.input_stream_wrapper
- # get the data pointer of input_stream
- self._input_data = input_stream_wrapper.input_stream.allc
- self._input_buffer_size = input_stream_wrapper.input_buffer_size
-
- # get the infos of input coder which will be used to decode data from
input_stream
- self._input_field_count = input_stream_wrapper.input_field_count
- self._input_leading_complete_bytes_num =
input_stream_wrapper.input_leading_complete_bytes_num
- self._input_remaining_bits_num =
input_stream_wrapper.input_remaining_bits_num
- self._input_field_type = input_stream_wrapper.input_field_type
- self._input_coder_type = input_stream_wrapper.input_coder_type
- self._input_field_coders = input_stream_wrapper.input_field_coders
- self._null_mask = <bint*> libc.stdlib.malloc(self._input_field_count *
sizeof(bint))
- self._input_pos = 0
-
- # initial the result row and get the Python user-defined function
- self.row = [None for _ in range(self._input_field_count)]
- self.func = input_stream_and_function_wrapper.func
-
- cdef void _encode_field_simple(self, TypeName field_type, item):
- cdef libc.stdint.int32_t hour, minute, seconds, microsecond,
milliseconds
- if field_type == TINYINT:
- # tinyint
- self._encode_byte(item)
- elif field_type == SMALLINT:
- # smallint
- self._encode_smallint(item)
- elif field_type == INT:
- # int
- self._encode_int(item)
- elif field_type == BIGINT:
- # bigint
- self._encode_bigint(item)
- elif field_type == BOOLEAN:
- # boolean
- self._encode_byte(item)
- elif field_type == FLOAT:
- # float
- self._encode_float(item)
- elif field_type == DOUBLE:
- # double
- self._encode_double(item)
- elif field_type == BINARY:
- # bytes
- self._encode_bytes(item)
- elif field_type == CHAR:
- # str
- self._encode_bytes(item.encode('utf-8'))
- elif field_type == DATE:
- # Date
- # EPOCH_ORDINAL = datetime.datetime(1970, 1, 1).toordinal()
- # The value of EPOCH_ORDINAL is 719163
- self._encode_int(item.toordinal() - 719163)
- elif field_type == TIME:
- # Time
- hour = item.hour
- minute = item.minute
- seconds = item.second
- microsecond = item.microsecond
- milliseconds = hour * 3600000 + minute * 60000 + seconds * 1000 +
microsecond // 1000
- self._encode_int(milliseconds)
-
- cdef void _copy_to_output_buffer(self):
- cdef size_t size
- cdef size_t i
- cdef bint is_realloc
- cdef char bits
- # the length of the variable prefix length will be less than 9 bytes
- if self._output_buffer_size < self._output_pos + self._tmp_output_pos
+ 9:
- self._output_buffer_size += self._tmp_output_buffer_size + 9
- self._output_data = <char*> libc.stdlib.realloc(self._output_data,
-
self._output_buffer_size)
- size = self._tmp_output_pos
- # write variable prefix length
- while size:
- bits = size & 0x7F
- size >>= 7
- if size:
- bits |= 0x80
- self._output_data[self._output_pos] = bits
- self._output_pos += 1
- if self._tmp_output_pos < 8:
- # This is faster than memcpy when the string is short.
- for i in range(self._tmp_output_pos):
- self._output_data[self._output_pos + i] =
self._tmp_output_data[i]
- else:
- libc.string.memcpy(self._output_data + self._output_pos,
self._tmp_output_data,
- self._tmp_output_pos)
- self._output_pos += self._tmp_output_pos
- self._tmp_output_pos = 0
-
- cdef void _maybe_flush(self, OutputStream out_stream):
- # Currently, it will trigger flushing when the size of buffer reach to
10_000_000
- if self._output_pos > 10_000_000:
- self._map_output_data_to_output_stream(out_stream)
- out_stream.flush()
- self._output_pos = 0
-
- cdef void _map_output_data_to_output_stream(self, OutputStream out_stream):
- out_stream.data = self._output_data
- out_stream.pos = self._output_pos
- out_stream.buffer_size = self._output_buffer_size
-
- cdef void _extend(self, size_t missing):
- while self._tmp_output_buffer_size < self._tmp_output_pos + missing:
- self._tmp_output_buffer_size *= 2
- self._tmp_output_data = <char*>
libc.stdlib.realloc(self._tmp_output_data,
-
self._tmp_output_buffer_size)
-
- cdef void _encode_byte(self, unsigned char val):
- if self._tmp_output_buffer_size < self._tmp_output_pos + 1:
- self._extend(1)
- self._tmp_output_data[self._tmp_output_pos] = val
- self._tmp_output_pos += 1
-
- cdef void _encode_smallint(self, libc.stdint.int16_t v):
- if self._tmp_output_buffer_size < self._tmp_output_pos + 2:
- self._extend(2)
- self._tmp_output_data[self._tmp_output_pos] = <unsigned char> (v >> 8)
- self._tmp_output_data[self._tmp_output_pos + 1] = <unsigned char> v
- self._tmp_output_pos += 2
-
- cdef void _encode_int(self, libc.stdint.int32_t v):
- if self._tmp_output_buffer_size < self._tmp_output_pos + 4:
- self._extend(4)
- self._tmp_output_data[self._tmp_output_pos] = <unsigned char> (v >> 24)
- self._tmp_output_data[self._tmp_output_pos + 1] = <unsigned char> (v
>> 16)
- self._tmp_output_data[self._tmp_output_pos + 2] = <unsigned char> (v
>> 8)
- self._tmp_output_data[self._tmp_output_pos + 3] = <unsigned char> v
- self._tmp_output_pos += 4
-
- cdef void _encode_bigint(self, libc.stdint.int64_t v):
- if self._tmp_output_buffer_size < self._tmp_output_pos + 8:
- self._extend(8)
- self._tmp_output_data[self._tmp_output_pos] = <unsigned char> (v >> 56)
- self._tmp_output_data[self._tmp_output_pos + 1] = <unsigned char> (v
>> 48)
- self._tmp_output_data[self._tmp_output_pos + 2] = <unsigned char> (v
>> 40)
- self._tmp_output_data[self._tmp_output_pos + 3] = <unsigned char> (v
>> 32)
- self._tmp_output_data[self._tmp_output_pos + 4] = <unsigned char> (v
>> 24)
- self._tmp_output_data[self._tmp_output_pos + 5] = <unsigned char> (v
>> 16)
- self._tmp_output_data[self._tmp_output_pos + 6] = <unsigned char> (v
>> 8)
- self._tmp_output_data[self._tmp_output_pos + 7] = <unsigned char> v
- self._tmp_output_pos += 8
-
- cdef void _encode_float(self, float v):
- self._encode_int((<libc.stdint.int32_t*> <char*> &v)[0])
-
- cdef void _encode_double(self, double v):
- self._encode_bigint((<libc.stdint.int64_t*> <char*> &v)[0])
-
- cdef void _encode_bytes(self, char*b):
- cdef libc.stdint.int32_t length = strlen(b)
- self._encode_int(length)
- if self._tmp_output_buffer_size < self._tmp_output_pos + length:
- self._extend(length)
- if length < 8:
- # This is faster than memcpy when the string is short.
- for i in range(length):
- self._tmp_output_data[self._tmp_output_pos + i] = b[i]
- else:
- libc.string.memcpy(self._tmp_output_data + self._tmp_output_pos,
b, length)
- self._tmp_output_pos += length
-
- cdef void _write_null_mask(self, value, libc.stdint.int32_t
leading_complete_bytes_num,
- libc.stdint.int32_t remaining_bits_num):
- cdef libc.stdint.int32_t field_pos, index
- cdef unsigned char*null_byte_search_table
- cdef unsigned char b, i
- field_pos = 0
- null_byte_search_table = self._null_byte_search_table
- for _ in range(leading_complete_bytes_num):
- b = 0x00
- for i in range(8):
- if value[field_pos + i] is None:
- b |= null_byte_search_table[i]
- field_pos += 8
- self._encode_byte(b)
-
- if remaining_bits_num:
- b = 0x00
- for i in range(remaining_bits_num):
- if value[field_pos + i] is None:
- b |= null_byte_search_table[i]
- self._encode_byte(b)
-
- def __dealloc__(self):
- if self.null_mask:
- libc.stdlib.free(self._null_mask)
- if self.null_byte_search_table:
- libc.stdlib.free(self._null_byte_search_table)
- if self._tmp_output_data:
- libc.stdlib.free(self._tmp_output_data)
- if self._output_field_type:
- libc.stdlib.free(self._output_field_type)
- if self._output_coder_type:
- libc.stdlib.free(self._output_coder_type)
-
-cdef class BaseCoder:
- cpdef CoderType coder_type(self):
- return UNDEFINED
-
- cpdef TypeName type_name(self):
- return NONE
-
-cdef class TinyIntCoderImpl(BaseCoder):
- cpdef CoderType coder_type(self):
- return SIMPLE
-
- cpdef TypeName type_name(self):
- return TINYINT
-
-cdef class SmallIntCoderImpl(BaseCoder):
- cpdef CoderType coder_type(self):
- return SIMPLE
- cpdef TypeName type_name(self):
- return SMALLINT
-
-cdef class IntCoderImpl(BaseCoder):
- cpdef CoderType coder_type(self):
- return SIMPLE
-
- cpdef TypeName type_name(self):
- return INT
-
-cdef class BigIntCoderImpl(BaseCoder):
- cpdef CoderType coder_type(self):
- return SIMPLE
- cpdef TypeName type_name(self):
- return BIGINT
-
-cdef class BooleanCoderImpl(BaseCoder):
- cpdef CoderType coder_type(self):
- return SIMPLE
- cpdef TypeName type_name(self):
- return BOOLEAN
-
-cdef class FloatCoderImpl(BaseCoder):
- cpdef CoderType coder_type(self):
- return SIMPLE
- cpdef TypeName type_name(self):
- return FLOAT
-
-cdef class DoubleCoderImpl(BaseCoder):
- cpdef CoderType coder_type(self):
- return SIMPLE
- cpdef TypeName type_name(self):
- return DOUBLE
-
-cdef class BinaryCoderImpl(BaseCoder):
- cpdef CoderType coder_type(self):
- return SIMPLE
- cpdef TypeName type_name(self):
- return BINARY
-
-cdef class CharCoderImpl(BaseCoder):
- cpdef CoderType coder_type(self):
- return SIMPLE
- cpdef TypeName type_name(self):
- return CHAR
-
-cdef class DateCoderImpl(BaseCoder):
- cpdef CoderType coder_type(self):
- return SIMPLE
-
- cpdef TypeName type_name(self):
- return DATE
-
-cdef class TimeCoderImpl(BaseCoder):
- cpdef CoderType coder_type(self):
- return SIMPLE
-
- cpdef TypeName type_name(self):
- return TIME
diff --git a/flink-python/pyflink/fn_execution/tests/test_coders.py
b/flink-python/pyflink/fn_execution/tests/test_coders_common.py
similarity index 95%
rename from flink-python/pyflink/fn_execution/tests/test_coders.py
rename to flink-python/pyflink/fn_execution/tests/test_coders_common.py
index 3134e83..69e9961 100644
--- a/flink-python/pyflink/fn_execution/tests/test_coders.py
+++ b/flink-python/pyflink/fn_execution/tests/test_coders_common.py
@@ -25,16 +25,7 @@ from pyflink.fn_execution.coders import BigIntCoder,
TinyIntCoder, BooleanCoder,
TimeCoder, TimestampCoder, ArrayCoder, MapCoder, DecimalCoder,
FlattenRowCoder, RowCoder, \
LocalZonedTimestampCoder
-try:
- from pyflink.fn_execution import fast_coder_impl # noqa # pylint:
disable=unused-import
- have_cython = True
-except ImportError:
- have_cython = False
-
-
[email protected](have_cython,
- "Found cython implementation, we don't need to test
non-compiled implementation")
class CodersTest(unittest.TestCase):
def check_coder(self, coder, *values):
diff --git a/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
b/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
deleted file mode 100644
index 15a8ab0..0000000
--- a/flink-python/pyflink/fn_execution/tests/test_fast_coders.py
+++ /dev/null
@@ -1,141 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-"""Tests common to all coder implementations."""
-import logging
-import unittest
-
-from pyflink.fn_execution import coder_impl
-
-try:
- from pyflink.fn_execution import fast_coder_impl
-
- have_cython = True
-except ImportError:
- have_cython = False
-
-
[email protected](have_cython, "Uncompiled Cython Coder")
-class CodersTest(unittest.TestCase):
-
- def check_cython_coder(self, python_field_coders, cython_field_coders,
data):
- from apache_beam.coders.coder_impl import create_InputStream,
create_OutputStream
- from pyflink.fn_execution.fast_coder_impl import
InputStreamAndFunctionWrapper
- py_flatten_row_coder =
coder_impl.FlattenRowCoderImpl(python_field_coders)
- internal = py_flatten_row_coder.encode(data)
- input_stream = create_InputStream(internal)
- output_stream = create_OutputStream()
- cy_flatten_row_coder =
fast_coder_impl.FlattenRowCoderImpl(cython_field_coders)
- value = cy_flatten_row_coder.decode_from_stream(input_stream, False)
- wrapper_func_input_element = InputStreamAndFunctionWrapper(
- lambda v: [v[i] for i in range(len(v))], value)
- cy_flatten_row_coder.encode_to_stream(wrapper_func_input_element,
output_stream, False)
- generator_result =
py_flatten_row_coder.decode_from_stream(create_InputStream(
- output_stream.get()), False)
- result = []
- for item in generator_result:
- result.append(item)
- try:
- self.assertEqual(result, data)
- except AssertionError:
- self.assertEqual(len(result), len(data))
- self.assertEqual(len(result[0]), len(data[0]))
- for i in range(len(data[0])):
- if isinstance(data[0][i], float):
- from pyflink.table.tests.test_udf import float_equal
- assert float_equal(data[0][i], result[0][i], 1e-6)
- else:
- self.assertEqual(data[0][i], result[0][i])
-
- # decide whether two floats are equal
- @staticmethod
- def float_equal(a, b, rel_tol=1e-09, abs_tol=0.0):
- return abs(a - b) <= max(rel_tol * max(abs(a), abs(b)), abs_tol)
-
- def test_cython_bigint_coder(self):
- data = [1, 100, -100, -1000]
- python_field_coders = [coder_impl.BigIntCoderImpl() for _ in
range(len(data))]
- cython_field_coders = [fast_coder_impl.BigIntCoderImpl() for _ in
range(len(data))]
- self.check_cython_coder(python_field_coders, cython_field_coders,
[data])
-
- def test_cython_tinyint_coder(self):
- data = [1, 10, 127, -128]
- python_field_coders = [coder_impl.TinyIntCoderImpl() for _ in
range(len(data))]
- cython_field_coders = [fast_coder_impl.TinyIntCoderImpl() for _ in
range(len(data))]
- self.check_cython_coder(python_field_coders, cython_field_coders,
[data])
-
- def test_cython_boolean_coder(self):
- data = [True, False]
- python_field_coders = [coder_impl.BooleanCoderImpl() for _ in
range(len(data))]
- cython_field_coders = [fast_coder_impl.BooleanCoderImpl() for _ in
range(len(data))]
- self.check_cython_coder(python_field_coders, cython_field_coders,
[data])
-
- def test_cython_smallint_coder(self):
- data = [32767, -32768, 0]
- python_field_coders = [coder_impl.SmallIntCoderImpl() for _ in
range(len(data))]
- cython_field_coders = [fast_coder_impl.SmallIntCoderImpl() for _ in
range(len(data))]
- self.check_cython_coder(python_field_coders, cython_field_coders,
[data])
-
- def test_cython_int_coder(self):
- data = [-2147483648, 2147483647]
- python_field_coders = [coder_impl.IntCoderImpl() for _ in
range(len(data))]
- cython_field_coders = [fast_coder_impl.IntCoderImpl() for _ in
range(len(data))]
- self.check_cython_coder(python_field_coders, cython_field_coders,
[data])
-
- def test_cython_float_coder(self):
- data = [1.02, 1.32]
- python_field_coders = [coder_impl.FloatCoderImpl() for _ in
range(len(data))]
- cython_field_coders = [fast_coder_impl.FloatCoderImpl() for _ in
range(len(data))]
- self.check_cython_coder(python_field_coders, cython_field_coders,
[data])
-
- def test_cython_double_coder(self):
- data = [-12.02, 1.98932]
- python_field_coders = [coder_impl.DoubleCoderImpl() for _ in
range(len(data))]
- cython_field_coders = [fast_coder_impl.DoubleCoderImpl() for _ in
range(len(data))]
- self.check_cython_coder(python_field_coders, cython_field_coders,
[data])
-
- def test_cython_binary_coder(self):
- data = [b'pyflink']
- python_field_coders = [coder_impl.BinaryCoderImpl() for _ in
range(len(data))]
- cython_field_coders = [fast_coder_impl.BinaryCoderImpl() for _ in
range(len(data))]
- self.check_cython_coder(python_field_coders, cython_field_coders,
[data])
-
- def test_cython_char_coder(self):
- data = ['flink', '🐿']
- python_field_coders = [coder_impl.CharCoderImpl() for _ in
range(len(data))]
- cython_field_coders = [fast_coder_impl.CharCoderImpl() for _ in
range(len(data))]
- self.check_cython_coder(python_field_coders, cython_field_coders,
[data])
-
- def test_cython_date_coder(self):
- import datetime
- data = [datetime.date(2019, 9, 10)]
- python_field_coders = [coder_impl.DateCoderImpl() for _ in
range(len(data))]
- cython_field_coders = [fast_coder_impl.DateCoderImpl() for _ in
range(len(data))]
- self.check_cython_coder(python_field_coders, cython_field_coders,
[data])
-
- def test_cython_time_coder(self):
- import datetime
- data = [datetime.time(hour=11, minute=11, second=11,
microsecond=123000)]
- python_field_coders = [coder_impl.TimeCoderImpl() for _ in
range(len(data))]
- cython_field_coders = [fast_coder_impl.TimeCoderImpl() for _ in
range(len(data))]
- self.check_cython_coder(python_field_coders, cython_field_coders,
[data])
-
-
-if __name__ == '__main__':
- logging.getLogger().setLevel(logging.INFO)
- unittest.main()
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 4fa842e..c74abeb 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -20,10 +20,9 @@ from __future__ import print_function
import io
import os
import sys
-from distutils.command.build_ext import build_ext
from shutil import copytree, copy, rmtree
-from setuptools import setup, Extension
+from setuptools import setup
if sys.version_info < (3, 5):
print("Python versions prior to 3.5 are not supported for PyFlink.",
@@ -40,26 +39,6 @@ def remove_if_exists(file_path):
rmtree(file_path)
-try:
- from Cython.Build import cythonize
- extensions = cythonize([
- Extension(
- name="pyflink.fn_execution.fast_coder_impl",
- sources=["pyflink/fn_execution/fast_coder_impl.pyx"],
- include_dirs=["pyflink/fn_execution/"])
- ])
-except ImportError:
- if os.path.exists("pyflink/fn_execution/fast_coder_impl.c"):
- extensions = ([
- Extension(
- name="pyflink.fn_execution.fast_coder_impl",
- sources=["pyflink/fn_execution/fast_coder_impl.c"],
- include_dirs=["pyflink/fn_execution/"])
- ])
- else:
- extensions = ([])
-
-
this_directory = os.path.abspath(os.path.dirname(__file__))
version_file = os.path.join(this_directory, 'pyflink/version.py')
@@ -253,19 +232,16 @@ run sdist.
install_requires=['py4j==0.10.8.1', 'python-dateutil==2.8.0',
'apache-beam==2.19.0',
'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1',
'jsonpickle==1.2',
'pandas>=0.23.4,<=0.25.3',
'pyarrow>=0.15.1,<0.16.0', 'pytz>=2018.3'],
- cmdclass={'build_ext': build_ext},
tests_require=['pytest==4.4.1'],
description='Apache Flink Python API',
long_description=long_description,
long_description_content_type='text/markdown',
- zip_safe=False,
classifiers=[
'Development Status :: 5 - Production/Stable',
'License :: OSI Approved :: Apache Software License',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
- 'Programming Language :: Python :: 3.7'],
- ext_modules=extensions
+ 'Programming Language :: Python :: 3.7']
)
finally:
if in_flink_source:
diff --git a/flink-python/tox.ini b/flink-python/tox.ini
index 2742ed2..b944857 100644
--- a/flink-python/tox.ini
+++ b/flink-python/tox.ini
@@ -21,23 +21,17 @@
# in multiple virtualenvs. This configuration file will run the
# test suite on all supported python versions.
# new environments will be excluded by default unless explicitly added to
envlist.
-envlist = {py35, py36, py37}-cython
+envlist = py35, py36, py37
[testenv]
whitelist_externals=
/bin/bash
deps =
pytest
- apache-beam==2.19.0
- cython==0.28.1
grpcio>=1.17.0,<=1.26.0
grpcio-tools>=1.3.5,<=1.14.2
commands =
python --version
- # python test
- pytest --durations=0
- python setup.py build_ext --inplace
- # cython test
pytest --durations=0
bash ./dev/run_pip_test.sh
# Replace the default installation command with a custom retry installation
script, because on high-speed