This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f583ea2084 Create a varint32 coder and used it for RowCoder (#34354)
5f583ea2084 is described below

commit 5f583ea2084252074290c3ab16bb369ae0f524c6
Author: Yi Hu <[email protected]>
AuthorDate: Mon Mar 24 15:53:31 2025 -0400

    Create a varint32 coder and used it for RowCoder (#34354)
    
    * Create a varint32 coder and used it for RowCoder
    
    * Fix checks
    
    * Handle a race when multiple process downloading jar at the same time
---
 .../beam_PostCommit_Python_Xlang_Gcp_Direct.json   |  2 +-
 sdks/python/apache_beam/coders/coder_impl.pxd      |  5 ++++
 sdks/python/apache_beam/coders/coder_impl.py       | 31 ++++++++++++++++++++++
 sdks/python/apache_beam/coders/coders.py           | 21 ++++++++++++++-
 .../apache_beam/coders/coders_test_common.py       | 14 ++++++++++
 sdks/python/apache_beam/coders/row_coder.py        |  5 +++-
 sdks/python/apache_beam/coders/row_coder_test.py   | 23 ++++++++++++++++
 sdks/python/apache_beam/coders/slow_stream.py      |  7 +++++
 sdks/python/apache_beam/coders/stream.pxd          |  4 +++
 sdks/python/apache_beam/coders/stream.pyx          | 18 ++++++++++++-
 sdks/python/apache_beam/coders/stream_test.py      | 21 +++++++++++++++
 .../io/external/xlang_jdbcio_it_test.py            | 19 ++++++++-----
 sdks/python/apache_beam/utils/subprocess_server.py |  7 ++++-
 13 files changed, 165 insertions(+), 12 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json 
b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
index f1ba03a243e..e0266d62f2e 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Xlang_Gcp_Direct.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "modification": 5
+  "modification": 4
 }
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd 
b/sdks/python/apache_beam/coders/coder_impl.pxd
index 8a28499555c..27cffe7b62d 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -130,6 +130,11 @@ cdef class VarIntCoderImpl(StreamCoderImpl):
   cpdef bytes encode(self, value)
 
 
+cdef class VarInt32CoderImpl(StreamCoderImpl):
+  @cython.locals(ivalue=libc.stdint.int32_t)
+  cpdef bytes encode(self, value)
+
+
 cdef class SingletonCoderImpl(CoderImpl):
   cdef object _value
 
diff --git a/sdks/python/apache_beam/coders/coder_impl.py 
b/sdks/python/apache_beam/coders/coder_impl.py
index 5dff3505290..36f0b7a4ab2 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -974,6 +974,37 @@ class VarIntCoderImpl(StreamCoderImpl):
     return get_varint_size(value)
 
 
+class VarInt32CoderImpl(StreamCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A coder for int32 objects."""
+  def encode_to_stream(self, value, out, nested):
+    # type: (int, create_OutputStream, bool) -> None
+    out.write_var_int32(value)
+
+  def decode_from_stream(self, in_stream, nested):
+    # type: (create_InputStream, bool) -> int
+    return in_stream.read_var_int32()
+
+  def encode(self, value):
+    ivalue = value  # type cast
+    if 0 <= ivalue < len(small_ints):
+      return small_ints[ivalue]
+    return StreamCoderImpl.encode(self, value)
+
+  def decode(self, encoded):
+    if len(encoded) == 1:
+      i = ord(encoded)
+      if 0 <= i < 128:
+        return i
+    return StreamCoderImpl.decode(self, encoded)
+
+  def estimate_size(self, value, nested=False):
+    # type: (Any, bool) -> int
+    # Note that VarInts are encoded the same way regardless of nesting.
+    return get_varint_size(value & 0xFFFFFFFF)
+
+
 class SingletonCoderImpl(CoderImpl):
   """For internal use only; no backwards-compatibility guarantees.
 
diff --git a/sdks/python/apache_beam/coders/coders.py 
b/sdks/python/apache_beam/coders/coders.py
index e6250532aef..cb23e3967e3 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -629,7 +629,7 @@ 
Coder.register_structured_urn(common_urns.coders.NULLABLE.urn, NullableCoder)
 
 
 class VarIntCoder(FastCoder):
-  """Variable-length integer coder."""
+  """Variable-length integer coder  matches Java SDK's VarLongCoder."""
   def _create_impl(self):
     return coder_impl.VarIntCoderImpl()
 
@@ -650,6 +650,25 @@ class VarIntCoder(FastCoder):
 Coder.register_structured_urn(common_urns.coders.VARINT.urn, VarIntCoder)
 
 
+class VarInt32Coder(FastCoder):
+  """Variable-length integer coder matches Java SDK's VarIntCoder."""
+  def _create_impl(self):
+    return coder_impl.VarInt32CoderImpl()
+
+  def is_deterministic(self):
+    # type: () -> bool
+    return True
+
+  def to_type_hint(self):
+    return int
+
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return hash(type(self))
+
+
 class BigEndianShortCoder(FastCoder):
   """A coder used for big-endian int16 values."""
   def _create_impl(self):
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py 
b/sdks/python/apache_beam/coders/coders_test_common.py
index f3381cdb1d6..bed93cbc554 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -318,6 +318,20 @@ class CodersTest(unittest.TestCase):
             for k in range(0, int(math.log(MAX_64_BIT_INT)))
         ])
 
+  def test_varint32_coder(self):
+    # Small ints.
+    self.check_coder(coders.VarInt32Coder(), *range(-10, 10))
+    # Multi-byte encoding starts at 128
+    self.check_coder(coders.VarInt32Coder(), *range(120, 140))
+    # Large values
+    MAX_32_BIT_INT = 0x7fffffff
+    self.check_coder(
+        coders.VarIntCoder(),
+        *[
+            int(math.pow(-1, k) * math.exp(k))
+            for k in range(0, int(math.log(MAX_32_BIT_INT)))
+        ])
+
   def test_float_coder(self):
     self.check_coder(
         coders.FloatCoder(), *[float(0.1 * x) for x in range(-100, 100)])
diff --git a/sdks/python/apache_beam/coders/row_coder.py 
b/sdks/python/apache_beam/coders/row_coder.py
index e93abbc887f..dc473b1d6d7 100644
--- a/sdks/python/apache_beam/coders/row_coder.py
+++ b/sdks/python/apache_beam/coders/row_coder.py
@@ -33,6 +33,7 @@ from apache_beam.coders.coders import NullableCoder
 from apache_beam.coders.coders import SinglePrecisionFloatCoder
 from apache_beam.coders.coders import StrUtf8Coder
 from apache_beam.coders.coders import TimestampCoder
+from apache_beam.coders.coders import VarInt32Coder
 from apache_beam.coders.coders import VarIntCoder
 from apache_beam.portability import common_urns
 from apache_beam.portability.api import schema_pb2
@@ -142,8 +143,10 @@ def _coder_from_type(field_type):
 def _nonnull_coder_from_type(field_type):
   type_info = field_type.WhichOneof("type_info")
   if type_info == "atomic_type":
-    if field_type.atomic_type in (schema_pb2.INT32, schema_pb2.INT64):
+    if field_type.atomic_type == schema_pb2.INT64:
       return VarIntCoder()
+    elif field_type.atomic_type == schema_pb2.INT32:
+      return VarInt32Coder()
     if field_type.atomic_type == schema_pb2.INT16:
       return BigEndianShortCoder()
     elif field_type.atomic_type == schema_pb2.FLOAT:
diff --git a/sdks/python/apache_beam/coders/row_coder_test.py 
b/sdks/python/apache_beam/coders/row_coder_test.py
index 6ac982835cb..4d47bca3e2b 100644
--- a/sdks/python/apache_beam/coders/row_coder_test.py
+++ b/sdks/python/apache_beam/coders/row_coder_test.py
@@ -203,6 +203,29 @@ class RowCoderTest(unittest.TestCase):
     for test_case in self.PEOPLE:
       self.assertEqual(test_case, coder.decode(coder.encode(test_case)))
 
+  def test_row_coder_negative_varint(self):
+    schema = schema_pb2.Schema(
+        id="negative",
+        fields=[
+            schema_pb2.Field(
+                name="i64",
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.INT64)),
+            schema_pb2.Field(
+                name="i32",
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32))
+        ])
+    coder = RowCoder(schema)
+    Negative = typing.NamedTuple(
+        "Negative", [
+            ("i64", np.int64),
+            ("i32", np.int32),
+        ])
+    test_cases = [
+        Negative(-1, -1023), Negative(-1023, -1), Negative(-2**63, -2**31)
+    ]
+    for test_case in test_cases:
+      self.assertEqual(test_case, coder.decode(coder.encode(test_case)))
+
   @unittest.skip(
       "https://github.com/apache/beam/issues/19696 - Overflow behavior in "
       "VarIntCoder is currently inconsistent")
diff --git a/sdks/python/apache_beam/coders/slow_stream.py 
b/sdks/python/apache_beam/coders/slow_stream.py
index b08ad8e9a37..fb4aa50f233 100644
--- a/sdks/python/apache_beam/coders/slow_stream.py
+++ b/sdks/python/apache_beam/coders/slow_stream.py
@@ -58,6 +58,9 @@ class OutputStream(object):
       if not v:
         break
 
+  def write_var_int32(self, v: int) -> None:
+    self.write_var_int64(int(v) & 0xFFFFFFFF)
+
   def write_bigendian_int64(self, v):
     self.write(struct.pack('>q', v))
 
@@ -156,6 +159,10 @@ class InputStream(object):
       result -= 1 << 64
     return result
 
+  def read_var_int32(self):
+    v = self.read_var_int64()
+    return struct.unpack('<i', struct.pack('<I', v))[0]
+
   def read_bigendian_int64(self):
     return struct.unpack('>q', self.read(8))[0]
 
diff --git a/sdks/python/apache_beam/coders/stream.pxd 
b/sdks/python/apache_beam/coders/stream.pxd
index 97d66aa089a..de5377cc94f 100644
--- a/sdks/python/apache_beam/coders/stream.pxd
+++ b/sdks/python/apache_beam/coders/stream.pxd
@@ -26,6 +26,7 @@ cdef class OutputStream(object):
   cpdef write(self, bytes b, bint nested=*)
   cpdef write_byte(self, unsigned char val)
   cpdef write_var_int64(self, libc.stdint.int64_t v)
+  cpdef write_var_int32(self, libc.stdint.int32_t v)
   cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v)
   cpdef write_bigendian_uint64(self, libc.stdint.uint64_t signed_v)
   cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v)
@@ -43,6 +44,8 @@ cdef class ByteCountingOutputStream(OutputStream):
   cdef size_t count
 
   cpdef write(self, bytes b, bint nested=*)
+  cpdef write_var_int64(self, libc.stdint.int64_t val)
+  cpdef write_var_int32(self, libc.stdint.int32_t val)
   cpdef write_byte(self, unsigned char val)
   cpdef write_bigendian_int64(self, libc.stdint.int64_t val)
   cpdef write_bigendian_uint64(self, libc.stdint.uint64_t val)
@@ -61,6 +64,7 @@ cdef class InputStream(object):
   cpdef bytes read(self, size_t len)
   cpdef long read_byte(self) except? -1
   cpdef libc.stdint.int64_t read_var_int64(self) except? -1
+  cpdef libc.stdint.int32_t read_var_int32(self) except? -1
   cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1
   cpdef libc.stdint.uint64_t read_bigendian_uint64(self) except? -1
   cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1
diff --git a/sdks/python/apache_beam/coders/stream.pyx 
b/sdks/python/apache_beam/coders/stream.pyx
index 3977660f68b..d9de4afbec7 100644
--- a/sdks/python/apache_beam/coders/stream.pyx
+++ b/sdks/python/apache_beam/coders/stream.pyx
@@ -73,6 +73,11 @@ cdef class OutputStream(object):
       if not v:
         break
 
+  cpdef write_var_int32(self, libc.stdint.int32_t signed_v):
+    """Encode an int using variable-length encoding to a stream."""
+    cdef libc.stdint.int64_t v = signed_v & <libc.stdint.int64_t>(0xFFFFFFFF)
+    self.write_var_int64(v)
+
   cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v):
     self.write_bigendian_uint64(signed_v)
 
@@ -91,7 +96,7 @@ cdef class OutputStream(object):
 
   cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v):
     cdef libc.stdint.uint32_t v = signed_v
-    if  self.buffer_size < self.pos + 4:
+    if self.buffer_size < self.pos + 4:
       self.extend(4)
     self.data[self.pos    ] = <unsigned char>(v >> 24)
     self.data[self.pos + 1] = <unsigned char>(v >> 16)
@@ -151,6 +156,12 @@ cdef class ByteCountingOutputStream(OutputStream):
   cpdef write_var_int64(self, libc.stdint.int64_t signed_v):
     self.count += get_varint_size(signed_v)
 
+  cpdef write_var_int32(self, libc.stdint.int32_t signed_v):
+    if signed_v < 0:
+      self.count += 5
+    else:
+      self.count += get_varint_size(signed_v)
+
   cpdef write_byte(self, unsigned char _):
     self.count += 1
 
@@ -225,6 +236,11 @@ cdef class InputStream(object):
 
     return result
 
+  cpdef libc.stdint.int32_t read_var_int32(self) except? -1:
+    """Decode a variable-length encoded int32 from a stream."""
+    cdef libc.stdint.int64_t v = self.read_var_int64()
+    return <libc.stdint.int32_t>(v);
+
   cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1:
     return self.read_bigendian_uint64()
 
diff --git a/sdks/python/apache_beam/coders/stream_test.py 
b/sdks/python/apache_beam/coders/stream_test.py
index 57662056b2a..1e8b2ac1162 100644
--- a/sdks/python/apache_beam/coders/stream_test.py
+++ b/sdks/python/apache_beam/coders/stream_test.py
@@ -92,6 +92,27 @@ class StreamTest(unittest.TestCase):
   def test_large_var_int64(self):
     self.run_read_write_var_int64([0, 2**63 - 1, -2**63, 2**63 - 3])
 
+  def run_read_write_var_int32(self, values):
+    out_s = self.OutputStream()
+    for v in values:
+      out_s.write_var_int32(v)
+    in_s = self.InputStream(out_s.get())
+    for v in values:
+      self.assertEqual(v, in_s.read_var_int32())
+
+  def test_small_var_int32(self):
+    self.run_read_write_var_int32(range(-10, 30))
+
+  def test_medium_var_int32(self):
+    base = -1.7
+    self.run_read_write_var_int32([
+        int(base**pow)
+        for pow in range(1, int(31 * math.log(2) / math.log(-base)))
+    ])
+
+  def test_large_var_int32(self):
+    self.run_read_write_var_int32([0, 2**31 - 1, -2**31, 2**31 - 3])
+
   def test_read_write_double(self):
     values = 0, 1, -1, 1e100, 1.0 / 3, math.pi, float('inf')
     out_s = self.OutputStream()
diff --git a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py 
b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
index 01d868950c0..ca6fa3d711c 100644
--- a/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
+++ b/sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
@@ -61,10 +61,10 @@ MYSQL_BINARY_TYPE = ('BINARY(10)', 'VARBINARY(10)')
 
 JdbcTestRow = typing.NamedTuple(
     "JdbcTestRow",
-    [("f_id", int), ("f_float", float), ("f_char", str), ("f_varchar", str),
-     ("f_bytes", bytes), ("f_varbytes", bytes), ("f_timestamp", Timestamp),
-     ("f_decimal", Decimal), ("f_date", datetime.date),
-     ("f_time", datetime.time)],
+    [("f_id", int), ("f_id_long", int), ("f_float", float), ("f_char", str),
+     ("f_varchar", str), ("f_bytes", bytes), ("f_varbytes", bytes),
+     ("f_timestamp", Timestamp), ("f_decimal", Decimal),
+     ("f_date", datetime.date), ("f_time", datetime.time)],
 )
 coders.registry.register_coder(JdbcTestRow, coders.RowCoder)
 
@@ -72,6 +72,7 @@ CustomSchemaRow = typing.NamedTuple(
     "CustomSchemaRow",
     [
         ("renamed_id", int),
+        ("renamed_id_long", int),
         ("renamed_float", float),
         ("renamed_char", str),
         ("renamed_varchar", str),
@@ -184,7 +185,7 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
     connection.execute(
         sqlalchemy.text(
             f"CREATE TABLE IF NOT EXISTS {table_name}" +
-            "(f_id INTEGER, f_float DOUBLE PRECISION, " +
+            "(f_id INTEGER, f_id_long BIGINT, f_float DOUBLE PRECISION, " +
             "f_char CHAR(10), f_varchar VARCHAR(10), " +
             f"f_bytes {binary_type[0]}, f_varbytes {binary_type[1]}, " +
             "f_timestamp TIMESTAMP(3), f_decimal DECIMAL(10, 2), " +
@@ -193,7 +194,8 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
   def generate_test_data(self, count):
     return [
         JdbcTestRow(
-            i,
+            i - 3,
+            i - 3,
             i + 0.1,
             f'Test{i}',
             f'Test{i}',
@@ -225,6 +227,7 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
 
       expected_rows.append(
           JdbcTestRow(
+              row.f_id,
               row.f_id,
               row.f_float,
               f_char,
@@ -310,6 +313,7 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
 
       expected_rows.append(
           CustomSchemaRow(
+              row.f_id,
               row.f_id,
               row.f_float,
               f_char,
@@ -324,6 +328,7 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
     def custom_row_equals(expected, actual):
       return (
           expected.renamed_id == actual.renamed_id and
+          expected.renamed_id_long == actual.renamed_id_long and
           expected.renamed_float == actual.renamed_float and
           expected.renamed_char.rstrip() == actual.renamed_char.rstrip() and
           expected.renamed_varchar == actual.renamed_varchar and
@@ -390,7 +395,7 @@ class CrossLanguageJdbcIOTest(unittest.TestCase):
         SimpleRow(2, "Item2", 20.75),
         SimpleRow(3, "Item3", 30.25),
         SimpleRow(4, "Item4", 40.0),
-        SimpleRow(5, "Item5", 50.5)
+        SimpleRow(-5, "Item5", 50.5)
     ]
 
     config = self.jdbc_configs[database]
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py 
b/sdks/python/apache_beam/utils/subprocess_server.py
index efb27715cd8..85d9286bddd 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -423,7 +423,12 @@ class JavaJarServer(SubprocessServer):
             url_read = urlopen(url)
           with open(cached_jar + '.tmp', 'wb') as jar_write:
             shutil.copyfileobj(url_read, jar_write, length=1 << 20)
-          os.rename(cached_jar + '.tmp', cached_jar)
+          try:
+            os.rename(cached_jar + '.tmp', cached_jar)
+          except FileNotFoundError:
+            # A race when multiple programs run in parallel and the cached_jar
+            # is already moved. Safe to ignore.
+            pass
         except URLError as e:
           raise RuntimeError(
               f'Unable to fetch remote job server jar at {url}: {e}. If no '

Reply via email to