gemini-code-assist[bot] commented on code in PR #36840:
URL: https://github.com/apache/beam/pull/36840#discussion_r2538627503


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java:
##########
@@ -147,6 +149,22 @@ private static void setBeamValueToKey(
         keyBuilder.append(row.getDecimal(columnName));
         break;
         // TODO: Implement logical date and datetime
+      case LOGICAL_TYPE:
+        Schema.LogicalType<?, ?> logicalType = 
checkNotNull(field.getLogicalType());
+        String identifier = logicalType.getIdentifier();
+        if (identifier.equals(MicrosInstant.IDENTIFIER)) {
+          Instant instant = row.getValue(columnName);
+          if (instant == null) {
+            keyBuilder.append((Timestamp) null);
+          } else {
+            long micros = instant.getEpochSecond() * 1_000_000L + 
instant.getNano() / 1_000L;
+            keyBuilder.append(Timestamp.ofTimeMicroseconds(micros));

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The logic to convert a `java.time.Instant` to a `com.google.cloud.Timestamp` 
with microsecond precision is duplicated in `setBeamValueToKey`, 
`setBeamValueToMutation` (lines 253-254), and `addIterableToMutationBuilder` 
(lines 384-387). To improve code clarity and maintainability, consider 
extracting this conversion into a private static helper method.
   
   For example:
   ```java
   private static Timestamp toSpannerTimestamp(Instant instant) {
     long micros = instant.getEpochSecond() * 1_000_000L + instant.getNano() / 
1_000L;
     return Timestamp.ofTimeMicroseconds(micros);
   }
   ```
   
   This new method could then be used in all three locations, simplifying the 
code.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java:
##########
@@ -365,7 +366,23 @@ private static void addIterableToStructBuilder(
         return struct.getBytes(column).toByteArray();
         // TODO: implement logical datetime
       case TIMESTAMP:
-        return 
Instant.ofEpochSecond(struct.getTimestamp(column).getSeconds()).toDateTime();
+        Timestamp spannerTimestamp = struct.getTimestamp(column);
+
+        // Check if the Beam schema expects MicrosInstant logical type
+        Schema.FieldType fieldType = field.getType();
+        if (fieldType.getTypeName().isLogicalType()) {
+          Schema.@Nullable LogicalType<?, ?> logicalType = 
fieldType.getLogicalType();
+          if (logicalType != null && 
logicalType.getIdentifier().equals(MicrosInstant.IDENTIFIER)) {
+            // Convert to java.time.Instant with microsecond precision
+            long micros =
+                spannerTimestamp.getSeconds() * 1_000_000L + 
spannerTimestamp.getNanos() / 1_000L;
+            return java.time.Instant.ofEpochSecond(
+                micros / 1_000_000L, (micros % 1_000_000L) * 1_000L);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The logic to convert a `com.google.cloud.Timestamp` to a `java.time.Instant` 
is duplicated here and in `getStructArrayValue` (lines 437-440). To avoid 
duplication and improve maintainability, this logic should be extracted into a 
private static helper method.
   
   For example, you could add:
   ```java
   private static java.time.Instant fromSpannerTimestamp(Timestamp 
spannerTimestamp) {
     long micros = spannerTimestamp.getSeconds() * 1_000_000L + 
spannerTimestamp.getNanos() / 1_000L;
     return java.time.Instant.ofEpochSecond(micros / 1_000_000L, (micros % 
1_000_000L) * 1_000L);
   }
   ```
   And then call it here:
   ```java
   return fromSpannerTimestamp(spannerTimestamp);
   ```



##########
sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py:
##########
@@ -118,76 +128,115 @@ def tearDown(self):
 
   def test_spanner_insert_or_update(self):
     self.spanner_helper.insert_values(
-        self.database_id, [('or_update0', 5, False), ('or_update1', 9, False)])
+        self.database_id,
+        [('or_update0', 5, False, Timestamp.of(1234567890.0).to_rfc3339()),
+         ('or_update1', 9, False, Timestamp.of(1234567891.0).to_rfc3339())])
 
     def to_row_fn(i):
       return SpannerTestRow(
-          f_int64=i, f_string=f'or_update{i}', f_boolean=i % 2 == 0)
+          f_int64=i,
+          f_string=f'or_update{i}',
+          f_boolean=i % 2 == 0,
+          f_timestamp=Timestamp.of(1234567890.0 + i))
 
     self.run_write_pipeline(3, to_row_fn, SpannerTestRow, 
SpannerInsertOrUpdate)
 
-    self.assertEqual(
-        self.spanner_helper.read_data(self.database_id, prefix='or_update'),
-        [[f'or_update{i}', i, i % 2 == 0] for i in range(3)])
+    results = self.spanner_helper.read_data(
+        self.database_id, prefix='or_update')
+    self.assertEqual(len(results), 3)
+    for i, row in enumerate(results):
+      self.assertEqual(row[0], f'or_update{i}')
+      self.assertEqual(row[1], i)
+      self.assertEqual(row[2], i % 2 == 0)
+      self.assertIsNotNone(row[3])  # timestamp field
 
   def test_spanner_insert(self):
     def to_row_fn(num):
       return SpannerTestRow(
-          f_string=f'insert{num}', f_int64=num, f_boolean=None)
+          f_string=f'insert{num}',
+          f_int64=num,
+          f_boolean=None,
+          f_timestamp=Timestamp.of(1234567890.0 + num))
 
     self.run_write_pipeline(1000, to_row_fn, SpannerTestRow, SpannerInsert)
 
     def compare_row(row):
       return row[1]
 
-    self.assertEqual(
-        sorted(
-            self.spanner_helper.read_data(self.database_id, 'insert'),
-            key=compare_row), [[f'insert{i}', i, None] for i in range(1000)])
+    results = sorted(
+        self.spanner_helper.read_data(self.database_id, 'insert'),
+        key=compare_row)
+
+    self.assertEqual(len(results), 1000)
+    for i, row in enumerate(results):
+      self.assertEqual(row[0], f'insert{i}')
+      self.assertEqual(row[1], i)
+      self.assertIsNone(row[2])
+      self.assertIsNotNone(row[3])
 
   def test_spanner_replace(self):
     self.spanner_helper.insert_values(
-        self.database_id, [('replace0', 0, True), ('replace1', 1, False)])
+        self.database_id,
+        [('replace0', 0, True, Timestamp.of(1234567890.0).to_rfc3339()),
+         ('replace1', 1, False, Timestamp.of(1234567891.0).to_rfc3339())])
 
     def to_row_fn(num):
-      return SpannerPartTestRow(f_string=f'replace{num}', f_int64=num + 10)
+      return SpannerPartTestRow(
+          f_string=f'replace{num}',
+          f_int64=num + 10,
+          f_timestamp=Timestamp.of(1234567900.0 + num))
 
     self.run_write_pipeline(2, to_row_fn, SpannerPartTestRow, SpannerReplace)
 
-    self.assertEqual(
-        self.spanner_helper.read_data(self.database_id, prefix='replace'),
-        [['replace0', 10, None], ['replace1', 11, None]])
+    results = self.spanner_helper.read_data(self.database_id, prefix='replace')
+    self.assertEqual(len(results), 2)
+    # In REPLACE, boolean should be NULL but timestamp should be updated
+    self.assertEqual(results[0][0], 'replace0')
+    self.assertEqual(results[0][1], 10)
+    self.assertIsNone(results[0][2])  # boolean replaced with NULL
+    self.assertIsNotNone(results[0][3])  # timestamp updated
 
   def test_spanner_update(self):
     self.spanner_helper.insert_values(
-        self.database_id, [('update0', 5, False), ('update1', 9, False)])
+        self.database_id,
+        [('update0', 5, False, Timestamp.of(1234567890.0).to_rfc3339()),
+         ('update1', 9, False, Timestamp.of(1234567891.0).to_rfc3339())])
 
     def to_row_fn(num):
-      return SpannerPartTestRow(f_string=f'update{num}', f_int64=num + 10)
+      return SpannerPartTestRow(
+          f_string=f'update{num}',
+          f_int64=num + 10,
+          f_timestamp=Timestamp.of(1234567900.0 + num))
 
     self.run_write_pipeline(2, to_row_fn, SpannerPartTestRow, SpannerUpdate)
 
-    self.assertEqual(
-        self.spanner_helper.read_data(self.database_id, 'update'),
-        [['update0', 10, False], ['update1', 11, False]])
+    results = self.spanner_helper.read_data(self.database_id, 'update')
+    self.assertEqual(len(results), 2)
+    # In UPDATE, boolean preserved but timestamp updated
+    self.assertEqual(results[0][1], 10)
+    self.assertEqual(results[0][2], False)  # boolean preserved
+    self.assertIsNotNone(results[0][3])  # timestamp updated

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The assertions for this test are incomplete. They only check some values of 
the first result row and do not check the second row at all. To make the test 
more robust, the assertions should be expanded to cover all relevant fields for 
both rows.
   
   ```python
       results = self.spanner_helper.read_data(self.database_id, 'update')
       self.assertEqual(len(results), 2)
       # In UPDATE, boolean preserved but timestamp updated
       self.assertEqual(results[0][0], 'update0')
       self.assertEqual(results[0][1], 10)
       self.assertEqual(results[0][2], False)  # boolean preserved
       self.assertIsNotNone(results[0][3])  # timestamp updated
   
       self.assertEqual(results[1][0], 'update1')
       self.assertEqual(results[1][1], 11)
       self.assertEqual(results[1][2], False)  # boolean preserved
       self.assertIsNotNone(results[1][3])  # timestamp updated
   ```



##########
sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py:
##########
@@ -118,76 +128,115 @@ def tearDown(self):
 
   def test_spanner_insert_or_update(self):
     self.spanner_helper.insert_values(
-        self.database_id, [('or_update0', 5, False), ('or_update1', 9, False)])
+        self.database_id,
+        [('or_update0', 5, False, Timestamp.of(1234567890.0).to_rfc3339()),
+         ('or_update1', 9, False, Timestamp.of(1234567891.0).to_rfc3339())])
 
     def to_row_fn(i):
       return SpannerTestRow(
-          f_int64=i, f_string=f'or_update{i}', f_boolean=i % 2 == 0)
+          f_int64=i,
+          f_string=f'or_update{i}',
+          f_boolean=i % 2 == 0,
+          f_timestamp=Timestamp.of(1234567890.0 + i))
 
     self.run_write_pipeline(3, to_row_fn, SpannerTestRow, 
SpannerInsertOrUpdate)
 
-    self.assertEqual(
-        self.spanner_helper.read_data(self.database_id, prefix='or_update'),
-        [[f'or_update{i}', i, i % 2 == 0] for i in range(3)])
+    results = self.spanner_helper.read_data(
+        self.database_id, prefix='or_update')
+    self.assertEqual(len(results), 3)
+    for i, row in enumerate(results):
+      self.assertEqual(row[0], f'or_update{i}')
+      self.assertEqual(row[1], i)
+      self.assertEqual(row[2], i % 2 == 0)
+      self.assertIsNotNone(row[3])  # timestamp field
 
   def test_spanner_insert(self):
     def to_row_fn(num):
       return SpannerTestRow(
-          f_string=f'insert{num}', f_int64=num, f_boolean=None)
+          f_string=f'insert{num}',
+          f_int64=num,
+          f_boolean=None,
+          f_timestamp=Timestamp.of(1234567890.0 + num))
 
     self.run_write_pipeline(1000, to_row_fn, SpannerTestRow, SpannerInsert)
 
     def compare_row(row):
       return row[1]
 
-    self.assertEqual(
-        sorted(
-            self.spanner_helper.read_data(self.database_id, 'insert'),
-            key=compare_row), [[f'insert{i}', i, None] for i in range(1000)])
+    results = sorted(
+        self.spanner_helper.read_data(self.database_id, 'insert'),
+        key=compare_row)
+
+    self.assertEqual(len(results), 1000)
+    for i, row in enumerate(results):
+      self.assertEqual(row[0], f'insert{i}')
+      self.assertEqual(row[1], i)
+      self.assertIsNone(row[2])
+      self.assertIsNotNone(row[3])
 
   def test_spanner_replace(self):
     self.spanner_helper.insert_values(
-        self.database_id, [('replace0', 0, True), ('replace1', 1, False)])
+        self.database_id,
+        [('replace0', 0, True, Timestamp.of(1234567890.0).to_rfc3339()),
+         ('replace1', 1, False, Timestamp.of(1234567891.0).to_rfc3339())])
 
     def to_row_fn(num):
-      return SpannerPartTestRow(f_string=f'replace{num}', f_int64=num + 10)
+      return SpannerPartTestRow(
+          f_string=f'replace{num}',
+          f_int64=num + 10,
+          f_timestamp=Timestamp.of(1234567900.0 + num))
 
     self.run_write_pipeline(2, to_row_fn, SpannerPartTestRow, SpannerReplace)
 
-    self.assertEqual(
-        self.spanner_helper.read_data(self.database_id, prefix='replace'),
-        [['replace0', 10, None], ['replace1', 11, None]])
+    results = self.spanner_helper.read_data(self.database_id, prefix='replace')
+    self.assertEqual(len(results), 2)
+    # In REPLACE, boolean should be NULL but timestamp should be updated
+    self.assertEqual(results[0][0], 'replace0')
+    self.assertEqual(results[0][1], 10)
+    self.assertIsNone(results[0][2])  # boolean replaced with NULL
+    self.assertIsNotNone(results[0][3])  # timestamp updated

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The assertions in this test case only validate the first row of the results. 
Since two rows are written to Spanner, it would be more robust to assert the 
contents of both rows to ensure the `SpannerReplace` transform works as 
expected for all inputs.
   
   ```python
       results = self.spanner_helper.read_data(self.database_id, 
prefix='replace')
       self.assertEqual(len(results), 2)
       # In REPLACE, boolean should be NULL but timestamp should be updated
       self.assertEqual(results[0][0], 'replace0')
       self.assertEqual(results[0][1], 10)
       self.assertIsNone(results[0][2])  # boolean replaced with NULL
       self.assertIsNotNone(results[0][3])  # timestamp updated
   
       self.assertEqual(results[1][0], 'replace1')
       self.assertEqual(results[1][1], 11)
       self.assertIsNone(results[1][2])  # boolean replaced with NULL
       self.assertIsNotNone(results[1][3])  # timestamp updated
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to