This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5f583ea2084 Create a varint32 coder and used it for RowCoder (#34354)
5f583ea2084 is described below
commit 5f583ea2084252074290c3ab16bb369ae0f524c6
Author: Yi Hu <[email protected]>
AuthorDate: Mon Mar 24 15:53:31 2025 -0400
Create a varint32 coder and used it for RowCoder (#34354)
* Create a varint32 coder and used it for RowCoder
* Fix checks
* Handle a race when multiple process downloading jar at the same time
---
.../beam_PostCommit_Python_Xlang_Gcp_Direct.json | 2 +-
sdks/python/apache_beam/coders/coder_impl.pxd | 5 ++++
sdks/python/apache_beam/coders/coder_impl.py | 31 ++++++++++++++++++++++
sdks/python/apache_beam/coders/coders.py | 21 ++++++++++++++-
.../apache_beam/coders/coders_test_common.py | 14 ++++++++++
sdks/python/apache_beam/coders/row_coder.py | 5 +++-
sdks/python/apache_beam/coders/row_coder_test.py | 23 ++++++++++++++++
sdks/python/apache_beam/coders/slow_stream.py | 7 +++++
sdks/python/apache_beam/coders/stream.pxd | 4 +++
sdks/python/apache_beam/coders/stream.pyx | 18 ++++++++++++-
sdks/python/apache_beam/coders/stream_test.py | 21 +++++++++++++++
.../io/external/xlang_jdbcio_it_test.py | 19 ++++++++-----
sdks/python/apache_beam/utils/subprocess_server.py | 7 ++++-
13 files changed, 165 insertions(+), 12 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index f1ba03a243e..e0266d62f2e 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run",
- "modification": 5
+ "modification": 4
}
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd
b/sdks/python/apache_beam/coders/coder_impl.pxd
index 8a28499555c..27cffe7b62d 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -130,6 +130,11 @@ cdef class VarIntCoderImpl(StreamCoderImpl):
cpdef bytes encode(self, value)
+cdef class VarInt32CoderImpl(StreamCoderImpl):
+ @cython.locals(ivalue=libc.stdint.int32_t)
+ cpdef bytes encode(self, value)
+
+
cdef class SingletonCoderImpl(CoderImpl):
cdef object _value
diff --git a/sdks/python/apache_beam/coders/coder_impl.py
b/sdks/python/apache_beam/coders/coder_impl.py
index 5dff3505290..36f0b7a4ab2 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -974,6 +974,37 @@ class VarIntCoderImpl(StreamCoderImpl):
return get_varint_size(value)
+class VarInt32CoderImpl(StreamCoderImpl):
+ """For internal use only; no backwards-compatibility guarantees.
+
+ A coder for int32 objects."""
+ def encode_to_stream(self, value, out, nested):
+ # type: (int, create_OutputStream, bool) -> None
+ out.write_var_int32(value)
+
+ def decode_from_stream(self, in_stream, nested):
+ # type: (create_InputStream, bool) -> int
+ return in_stream.read_var_int32()
+
+ def encode(self, value):
+ ivalue = value # type cast
+ if 0 <= ivalue < len(small_ints):
+ return small_ints[ivalue]
+ return StreamCoderImpl.encode(self, value)
+
+ def decode(self, encoded):
+ if len(encoded) == 1:
+ i = ord(encoded)
+ if 0 <= i < 128:
+ return i
+ return StreamCoderImpl.decode(self, encoded)
+
+ def estimate_size(self, value, nested=False):
+ # type: (Any, bool) -> int
+ # Note that VarInts are encoded the same way regardless of nesting.
+ return get_varint_size(value & 0xFFFFFFFF)
+
+
class SingletonCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
diff --git a/sdks/python/apache_beam/coders/coders.py
b/sdks/python/apache_beam/coders/coders.py
index e6250532aef..cb23e3967e3 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -629,7 +629,7 @@
Coder.register_structured_urn(common_urns.coders.NULLABLE.urn, NullableCoder)
class VarIntCoder(FastCoder):
- """Variable-length integer coder."""
+ """Variable-length integer coder matches Java SDK's VarLongCoder."""
def _create_impl(self):
return coder_impl.VarIntCoderImpl()
@@ -650,6 +650,25 @@ class VarIntCoder(FastCoder):
Coder.register_structured_urn(common_urns.coders.VARINT.urn, VarIntCoder)
+class VarInt32Coder(FastCoder):
+ """Variable-length integer coder matches Java SDK's VarIntCoder."""
+ def _create_impl(self):
+ return coder_impl.VarInt32CoderImpl()
+
+ def is_deterministic(self):
+ # type: () -> bool
+ return True
+
+ def to_type_hint(self):
+ return int
+
+ def __eq__(self, other):
+ return type(self) == type(other)
+
+ def __hash__(self):
+ return hash(type(self))
+
+
class BigEndianShortCoder(FastCoder):
"""A coder used for big-endian int16 values."""
def _create_impl(self):
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py
b/sdks/python/apache_beam/coders/coders_test_common.py
index f3381cdb1d6..bed93cbc554 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -318,6 +318,20 @@ class CodersTest(unittest.TestCase):
for k in range(0, int(math.log(MAX_64_BIT_INT)))
])
+ def test_varint32_coder(self):
+ # Small ints.
+ self.check_coder(coders.VarInt32Coder(), *range(-10, 10))
+ # Multi-byte encoding starts at 128
+ self.check_coder(coders.VarInt32Coder(), *range(120, 140))
+ # Large values
+ MAX_32_BIT_INT = 0x7fffffff
+ self.check_coder(
+ coders.VarIntCoder(),
+ *[
+ int(math.pow(-1, k) * math.exp(k))
+ for k in range(0, int(math.log(MAX_32_BIT_INT)))
+ ])
+
def test_float_coder(self):
self.check_coder(
coders.FloatCoder(), *[float(0.1 * x) for x in range(-100, 100)])
diff --git a/sdks/python/apache_beam/coders/row_coder.py
b/sdks/python/apache_beam/coders/row_coder.py
index e93abbc887f..dc473b1d6d7 100644
--- a/sdks/python/apache_beam/coders/row_coder.py
+++ b/sdks/python/apache_beam/coders/row_coder.py
@@ -33,6 +33,7 @@ from apache_beam.coders.coders import NullableCoder
from apache_beam.coders.coders import SinglePrecisionFloatCoder
from apache_beam.coders.coders import StrUtf8Coder
from apache_beam.coders.coders import TimestampCoder
+from apache_beam.coders.coders import VarInt32Coder
from apache_beam.coders.coders import VarIntCoder
from apache_beam.portability import common_urns
from apache_beam.portability.api import schema_pb2
@@ -142,8 +143,10 @@ def _coder_from_type(field_type):
def _nonnull_coder_from_type(field_type):
type_info = field_type.WhichOneof("type_info")
if type_info == "atomic_type":
- if field_type.atomic_type in (schema_pb2.INT32, schema_pb2.INT64):
+ if field_type.atomic_type == schema_pb2.INT64:
return VarIntCoder()
+ elif field_type.atomic_type == schema_pb2.INT32:
+ return VarInt32Coder()
if field_type.atomic_type == schema_pb2.INT16:
return BigEndianShortCoder()
elif field_type.atomic_type == schema_pb2.FLOAT:
diff --git a/sdks/python/apache_beam/coders/row_coder_test.py
b/sdks/python/apache_beam/coders/row_coder_test.py
index 6ac982835cb..4d47bca3e2b 100644
--- a/sdks/python/apache_beam/coders/row_coder_test.py
+++ b/sdks/python/apache_beam/coders/row_coder_test.py
@@ -203,6 +203,29 @@ class RowCoderTest(unittest.TestCase):
for test_case in self.PEOPLE:
self.assertEqual(test_case, coder.decode(coder.encode(test_case)))
+ def test_row_coder_negative_varint(self):
+ schema = schema_pb2.Schema(
+ id="negative",
+ fields=[
+ schema_pb2.Field(
+ name="i64",
+ type=schema_pb2.FieldType(atomic_type=schema_pb2.INT64)),
+ schema_pb2.Field(
+ name="i32",
+ type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32))
+ ])
+ coder = RowCoder(schema)
+ Negative = typing.NamedTuple(
+ "Negative", [
+ ("i64", np.int64),
+ ("i32", np.int32),
+ ])
+ test_cases = [
+ Negative(-1, -1023), Negative(-1023, -1), Negative(-2**63, -2**31)
+ ]
+ for test_case in test_cases:
+ self.assertEqual(test_case, coder.decode(coder.encode(test_case)))
+
@unittest.skip(
"https://github.com/apache/beam/issues/19696 - Overflow behavior in "
"VarIntCoder is currently inconsistent")
diff --git a/sdks/python/apache_beam/coders/slow_stream.py
b/sdks/python/apache_beam/coders/slow_stream.py
index b08ad8e9a37..fb4aa50f233 100644
--- a/sdks/python/apache_beam/coders/slow_stream.py
+++ b/sdks/python/apache_beam/coders/slow_stream.py
@@ -58,6 +58,9 @@ class OutputStream(object):
if not v:
break
+ def write_var_int32(self, v: int) -> None:
+ self.write_var_int64(int(v) & 0xFFFFFFFF)
+
def write_bigendian_int64(self, v):
self.write(struct.pack('>q', v))
@@ -156,6 +159,10 @@ class InputStream(object):
result -= 1 << 64
return result
+ def read_var_int32(self):
+ v = self.read_var_int64()
+ return struct.unpack('<i', struct.pack('<I', v))[0]
+
def read_bigendian_int64(self):
return struct.unpack('>q', self.read(8))[0]
diff --git a/sdks/python/apache_beam/coders/stream.pxd
b/sdks/python/apache_beam/coders/stream.pxd
index 97d66aa089a..de5377cc94f 100644
--- a/sdks/python/apache_beam/coders/stream.pxd
+++ b/sdks/python/apache_beam/coders/stream.pxd
@@ -26,6 +26,7 @@ cdef class OutputStream(object):
cpdef write(self, bytes b, bint nested=*)
cpdef write_byte(self, unsigned char val)
cpdef write_var_int64(self, libc.stdint.int64_t v)
+ cpdef write_var_int32(self, libc.stdint.int32_t v)
cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v)
cpdef write_bigendian_uint64(self, libc.stdint.uint64_t signed_v)
cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v)
@@ -43,6 +44,8 @@ cdef class ByteCountingOutputStream(OutputStream):
cdef size_t count
cpdef write(self, bytes b, bint nested=*)
+ cpdef write_var_int64(self, libc.stdint.int64_t val)
+ cpdef write_var_int32(self, libc.stdint.int32_t val)
cpdef write_byte(self, unsigned char val)
cpdef write_bigendian_int64(self, libc.stdint.int64_t val)
cpdef write_bigendian_uint64(self, libc.stdint.uint64_t val)
@@ -61,6 +64,7 @@ cdef class InputStream(object):
cpdef bytes read(self, size_t len)
cpdef long read_byte(self) except? -1
cpdef libc.stdint.int64_t read_var_int64(self) except? -1
+ cpdef libc.stdint.int32_t read_var_int32(self) except? -1
cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1
cpdef libc.stdint.uint64_t read_bigendian_uint64(self) except? -1
cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1
diff --git a/sdks/python/apache_beam/coders/stream.pyx
b/sdks/python/apache_beam/coders/stream.pyx
index 3977660f68b..d9de4afbec7 100644
--- a/sdks/python/apache_beam/coders/stream.pyx
+++ b/sdks/python/apache_beam/coders/stream.pyx
@@ -73,6 +73,11 @@ cdef class OutputStream(object):
if not v:
break
+ cpdef write_var_int32(self, libc.stdint.int32_t signed_v):
+ """Encode an int using variable-length encoding to a stream."""
+ cdef libc.stdint.int64_t v = signed_v & <libc.stdint.int64_t>(0xFFFFFFFF)
+ self.write_var_int64(v)
+
cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v):
self.write_bigendian_uint64(signed_v)
@@ -91,7 +96,7 @@ cdef class OutputStream(object):
cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v):
cdef libc.stdint.uint32_t v = signed_v
- if self.buffer_size < self.pos + 4:
+ if self.buffer_size < self.pos + 4:
self.extend(4)
self.data[self.pos ] = <unsigned char>(v >> 24)
self.data[self.pos + 1] = <unsigned char>(v >> 16)
@@ -151,6 +156,12 @@ cdef class ByteCountingOutputStream(OutputStream):
cpdef write_var_int64(self, libc.stdint.int64_t signed_v):
self.count += get_varint_size(signed_v)
+ cpdef write_var_int32(self, libc.stdint.int32_t signed_v):
+ if signed_v < 0:
+ self.count += 5
+ else:
+ self.count += get_varint_size(signed_v)
+
cpdef write_byte(self, unsigned char _):
self.count += 1
@@ -225,6 +236,11 @@ cdef class InputStream(object):
return result
+ cpdef libc.stdint.int32_t read_var_int32(self) except? -1:
+ """Decode a variable-length encoded int32 from a stream."""
+ cdef libc.stdint.int64_t v = self.read_var_int64()
+ return <libc.stdint.int32_t>(v);
+
cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1:
return self.read_bigendian_uint64()
diff --git a/sdks/python/apache_beam/coders/stream_test.py
b/sdks/python/apache_beam/coders/stream_test.py
index 57662056b2a..1e8b2ac1162 100644
--- a/sdks/python/apache_beam/coders/stream_test.py
+++ b/sdks/python/apache_beam/coders/stream_test.py
@@ -92,6 +92,27 @@ class StreamTest(unittest.TestCase):
def test_large_var_int64(self):
self.run_read_write_var_int64([0, 2**63 - 1, -2**63, 2**63 - 3])
+ def run_read_write_var_int32(self, values):
+ out_s = self.OutputStream()
+ for v in values:
+ out_s.write_var_int32(v)
+ in_s = self.InputStream(out_s.get())
+ for v in values:
+ self.assertEqual(v, in_s.read_var_int32())
+
+ def test_small_var_int32(self):
+ self.run_read_write_var_int32(range(-10, 30))
+
+ def test_medium_var_int32(self):
+ base = -1.7
+ self.run_read_write_var_int32([
+ int(base**pow)
+ for pow in range(1, int(31 * math.log(2) / math.log(-base)))
+ ])
+
+ def test_large_var_int32(self):
+ self.run_read_write_var_int32([0, 2**31 - 1, -2**31, 2**31 - 3])
+
def test_read_write_double(self):
values = 0, 1, -1, 1e100, 1.0 / 3, math.pi, float('inf')
out_s = self.OutputStream()
diff --git a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
index 01d868950c0..ca6fa3d711c 100644
--- a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
@@ -61,10 +61,10 @@ MYSQL_BINARY_TYPE = ('BINARY(10)', 'VARBINARY(10)')
JdbcTestRow = typing.NamedTuple(
"JdbcTestRow",
- [("f_id", int), ("f_float", float), ("f_char", str), ("f_varchar", str),
- ("f_bytes", bytes), ("f_varbytes", bytes), ("f_timestamp", Timestamp),
- ("f_decimal", Decimal), ("f_date", datetime.date),
- ("f_time", datetime.time)],
+ [("f_id", int), ("f_id_long", int), ("f_float", float), ("f_char", str),
+ ("f_varchar", str), ("f_bytes", bytes), ("f_varbytes", bytes),
+ ("f_timestamp", Timestamp), ("f_decimal", Decimal),
+ ("f_date", datetime.date), ("f_time", datetime.time)],
)
coders.registry.register_coder(JdbcTestRow, coders.RowCoder)
@@ -72,6 +72,7 @@ CustomSchemaRow = typing.NamedTuple(
"CustomSchemaRow",
[
("renamed_id", int),
+ ("renamed_id_long", int),
("renamed_float", float),
("renamed_char", str),
("renamed_varchar", str),
@@ -184,7 +185,7 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
connection.execute(
sqlalchemy.text(
f"CREATE TABLE IF NOT EXISTS {table_name}" +
- "(f_id INTEGER, f_float DOUBLE PRECISION, " +
+ "(f_id INTEGER, f_id_long BIGINT, f_float DOUBLE PRECISION, " +
"f_char CHAR(10), f_varchar VARCHAR(10), " +
f"f_bytes {binary_type[0]}, f_varbytes {binary_type[1]}, " +
"f_timestamp TIMESTAMP(3), f_decimal DECIMAL(10, 2), " +
@@ -193,7 +194,8 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
def generate_test_data(self, count):
return [
JdbcTestRow(
- i,
+ i - 3,
+ i - 3,
i + 0.1,
f'Test{i}',
f'Test{i}',
@@ -225,6 +227,7 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
expected_rows.append(
JdbcTestRow(
+ row.f_id,
row.f_id,
row.f_float,
f_char,
@@ -310,6 +313,7 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
expected_rows.append(
CustomSchemaRow(
+ row.f_id,
row.f_id,
row.f_float,
f_char,
@@ -324,6 +328,7 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
def custom_row_equals(expected, actual):
return (
expected.renamed_id == actual.renamed_id and
+ expected.renamed_id_long == actual.renamed_id_long and
expected.renamed_float == actual.renamed_float and
expected.renamed_char.rstrip() == actual.renamed_char.rstrip() and
expected.renamed_varchar == actual.renamed_varchar and
@@ -390,7 +395,7 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
SimpleRow(2, "Item2", 20.75),
SimpleRow(3, "Item3", 30.25),
SimpleRow(4, "Item4", 40.0),
- SimpleRow(5, "Item5", 50.5)
+ SimpleRow(-5, "Item5", 50.5)
]
config = self.jdbc_configs[database]
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py
b/sdks/python/apache_beam/utils/subprocess_server.py
index efb27715cd8..85d9286bddd 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -423,7 +423,12 @@ class JavaJarServer(SubprocessServer):
url_read = urlopen(url)
with open(cached_jar + '.tmp', 'wb') as jar_write:
shutil.copyfileobj(url_read, jar_write, length=1 << 20)
- os.rename(cached_jar + '.tmp', cached_jar)
+ try:
+ os.rename(cached_jar + '.tmp', cached_jar)
+ except FileNotFoundError:
+ # A race when multiple programs run in parallel and the cached_jar
+ # is already moved. Safe to ignore.
+ pass
except URLError as e:
raise RuntimeError(
f'Unable to fetch remote job server jar at {url}: {e}. If no '