gemini-code-assist[bot] commented on code in PR #36840:
URL: https://github.com/apache/beam/pull/36840#discussion_r2538627503
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java:
##########
@@ -147,6 +149,22 @@ private static void setBeamValueToKey(
keyBuilder.append(row.getDecimal(columnName));
break;
// TODO: Implement logical date and datetime
+ case LOGICAL_TYPE:
+ Schema.LogicalType<?, ?> logicalType =
checkNotNull(field.getLogicalType());
+ String identifier = logicalType.getIdentifier();
+ if (identifier.equals(MicrosInstant.IDENTIFIER)) {
+ Instant instant = row.getValue(columnName);
+ if (instant == null) {
+ keyBuilder.append((Timestamp) null);
+ } else {
+ long micros = instant.getEpochSecond() * 1_000_000L +
instant.getNano() / 1_000L;
+ keyBuilder.append(Timestamp.ofTimeMicroseconds(micros));
Review Comment:

The logic to convert a `java.time.Instant` to a `com.google.cloud.Timestamp`
with microsecond precision is duplicated in `setBeamValueToKey`,
`setBeamValueToMutation` (lines 253-254), and `addIterableToMutationBuilder`
(lines 384-387). To improve code clarity and maintainability, consider
extracting this conversion into a private static helper method.
For example:
```java
private static Timestamp toSpannerTimestamp(Instant instant) {
long micros = instant.getEpochSecond() * 1_000_000L + instant.getNano() /
1_000L;
return Timestamp.ofTimeMicroseconds(micros);
}
```
This new method could then be used in all three locations, simplifying the
code.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/StructUtils.java:
##########
@@ -365,7 +366,23 @@ private static void addIterableToStructBuilder(
return struct.getBytes(column).toByteArray();
// TODO: implement logical datetime
case TIMESTAMP:
- return
Instant.ofEpochSecond(struct.getTimestamp(column).getSeconds()).toDateTime();
+ Timestamp spannerTimestamp = struct.getTimestamp(column);
+
+ // Check if the Beam schema expects MicrosInstant logical type
+ Schema.FieldType fieldType = field.getType();
+ if (fieldType.getTypeName().isLogicalType()) {
+ Schema.@Nullable LogicalType<?, ?> logicalType =
fieldType.getLogicalType();
+ if (logicalType != null &&
logicalType.getIdentifier().equals(MicrosInstant.IDENTIFIER)) {
+ // Convert to java.time.Instant with microsecond precision
+ long micros =
+ spannerTimestamp.getSeconds() * 1_000_000L +
spannerTimestamp.getNanos() / 1_000L;
+ return java.time.Instant.ofEpochSecond(
+ micros / 1_000_000L, (micros % 1_000_000L) * 1_000L);
Review Comment:

The logic to convert a `com.google.cloud.Timestamp` to a `java.time.Instant`
is duplicated here and in `getStructArrayValue` (lines 437-440). To avoid
duplication and improve maintainability, this logic should be extracted into a
private static helper method.
For example, you could add:
```java
private static java.time.Instant fromSpannerTimestamp(Timestamp
spannerTimestamp) {
long micros = spannerTimestamp.getSeconds() * 1_000_000L +
spannerTimestamp.getNanos() / 1_000L;
return java.time.Instant.ofEpochSecond(micros / 1_000_000L, (micros %
1_000_000L) * 1_000L);
}
```
And then call it here:
```java
return fromSpannerTimestamp(spannerTimestamp);
```
##########
sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py:
##########
@@ -118,76 +128,115 @@ def tearDown(self):
def test_spanner_insert_or_update(self):
self.spanner_helper.insert_values(
- self.database_id, [('or_update0', 5, False), ('or_update1', 9, False)])
+ self.database_id,
+ [('or_update0', 5, False, Timestamp.of(1234567890.0).to_rfc3339()),
+ ('or_update1', 9, False, Timestamp.of(1234567891.0).to_rfc3339())])
def to_row_fn(i):
return SpannerTestRow(
- f_int64=i, f_string=f'or_update{i}', f_boolean=i % 2 == 0)
+ f_int64=i,
+ f_string=f'or_update{i}',
+ f_boolean=i % 2 == 0,
+ f_timestamp=Timestamp.of(1234567890.0 + i))
self.run_write_pipeline(3, to_row_fn, SpannerTestRow,
SpannerInsertOrUpdate)
- self.assertEqual(
- self.spanner_helper.read_data(self.database_id, prefix='or_update'),
- [[f'or_update{i}', i, i % 2 == 0] for i in range(3)])
+ results = self.spanner_helper.read_data(
+ self.database_id, prefix='or_update')
+ self.assertEqual(len(results), 3)
+ for i, row in enumerate(results):
+ self.assertEqual(row[0], f'or_update{i}')
+ self.assertEqual(row[1], i)
+ self.assertEqual(row[2], i % 2 == 0)
+ self.assertIsNotNone(row[3]) # timestamp field
def test_spanner_insert(self):
def to_row_fn(num):
return SpannerTestRow(
- f_string=f'insert{num}', f_int64=num, f_boolean=None)
+ f_string=f'insert{num}',
+ f_int64=num,
+ f_boolean=None,
+ f_timestamp=Timestamp.of(1234567890.0 + num))
self.run_write_pipeline(1000, to_row_fn, SpannerTestRow, SpannerInsert)
def compare_row(row):
return row[1]
- self.assertEqual(
- sorted(
- self.spanner_helper.read_data(self.database_id, 'insert'),
- key=compare_row), [[f'insert{i}', i, None] for i in range(1000)])
+ results = sorted(
+ self.spanner_helper.read_data(self.database_id, 'insert'),
+ key=compare_row)
+
+ self.assertEqual(len(results), 1000)
+ for i, row in enumerate(results):
+ self.assertEqual(row[0], f'insert{i}')
+ self.assertEqual(row[1], i)
+ self.assertIsNone(row[2])
+ self.assertIsNotNone(row[3])
def test_spanner_replace(self):
self.spanner_helper.insert_values(
- self.database_id, [('replace0', 0, True), ('replace1', 1, False)])
+ self.database_id,
+ [('replace0', 0, True, Timestamp.of(1234567890.0).to_rfc3339()),
+ ('replace1', 1, False, Timestamp.of(1234567891.0).to_rfc3339())])
def to_row_fn(num):
- return SpannerPartTestRow(f_string=f'replace{num}', f_int64=num + 10)
+ return SpannerPartTestRow(
+ f_string=f'replace{num}',
+ f_int64=num + 10,
+ f_timestamp=Timestamp.of(1234567900.0 + num))
self.run_write_pipeline(2, to_row_fn, SpannerPartTestRow, SpannerReplace)
- self.assertEqual(
- self.spanner_helper.read_data(self.database_id, prefix='replace'),
- [['replace0', 10, None], ['replace1', 11, None]])
+ results = self.spanner_helper.read_data(self.database_id, prefix='replace')
+ self.assertEqual(len(results), 2)
+ # In REPLACE, boolean should be NULL but timestamp should be updated
+ self.assertEqual(results[0][0], 'replace0')
+ self.assertEqual(results[0][1], 10)
+ self.assertIsNone(results[0][2]) # boolean replaced with NULL
+ self.assertIsNotNone(results[0][3]) # timestamp updated
def test_spanner_update(self):
self.spanner_helper.insert_values(
- self.database_id, [('update0', 5, False), ('update1', 9, False)])
+ self.database_id,
+ [('update0', 5, False, Timestamp.of(1234567890.0).to_rfc3339()),
+ ('update1', 9, False, Timestamp.of(1234567891.0).to_rfc3339())])
def to_row_fn(num):
- return SpannerPartTestRow(f_string=f'update{num}', f_int64=num + 10)
+ return SpannerPartTestRow(
+ f_string=f'update{num}',
+ f_int64=num + 10,
+ f_timestamp=Timestamp.of(1234567900.0 + num))
self.run_write_pipeline(2, to_row_fn, SpannerPartTestRow, SpannerUpdate)
- self.assertEqual(
- self.spanner_helper.read_data(self.database_id, 'update'),
- [['update0', 10, False], ['update1', 11, False]])
+ results = self.spanner_helper.read_data(self.database_id, 'update')
+ self.assertEqual(len(results), 2)
+ # In UPDATE, boolean preserved but timestamp updated
+ self.assertEqual(results[0][1], 10)
+ self.assertEqual(results[0][2], False) # boolean preserved
+ self.assertIsNotNone(results[0][3]) # timestamp updated
Review Comment:

The assertions for this test are incomplete. They only check some values of
the first result row and do not check the second row at all. To make the test
more robust, the assertions should be expanded to cover all relevant fields for
both rows.
```python
results = self.spanner_helper.read_data(self.database_id, 'update')
self.assertEqual(len(results), 2)
# In UPDATE, boolean preserved but timestamp updated
self.assertEqual(results[0][0], 'update0')
self.assertEqual(results[0][1], 10)
self.assertEqual(results[0][2], False) # boolean preserved
self.assertIsNotNone(results[0][3]) # timestamp updated
self.assertEqual(results[1][0], 'update1')
self.assertEqual(results[1][1], 11)
self.assertEqual(results[1][2], False) # boolean preserved
self.assertIsNotNone(results[1][3]) # timestamp updated
```
##########
sdks/python/apache_beam/io/gcp/tests/xlang_spannerio_it_test.py:
##########
@@ -118,76 +128,115 @@ def tearDown(self):
def test_spanner_insert_or_update(self):
self.spanner_helper.insert_values(
- self.database_id, [('or_update0', 5, False), ('or_update1', 9, False)])
+ self.database_id,
+ [('or_update0', 5, False, Timestamp.of(1234567890.0).to_rfc3339()),
+ ('or_update1', 9, False, Timestamp.of(1234567891.0).to_rfc3339())])
def to_row_fn(i):
return SpannerTestRow(
- f_int64=i, f_string=f'or_update{i}', f_boolean=i % 2 == 0)
+ f_int64=i,
+ f_string=f'or_update{i}',
+ f_boolean=i % 2 == 0,
+ f_timestamp=Timestamp.of(1234567890.0 + i))
self.run_write_pipeline(3, to_row_fn, SpannerTestRow,
SpannerInsertOrUpdate)
- self.assertEqual(
- self.spanner_helper.read_data(self.database_id, prefix='or_update'),
- [[f'or_update{i}', i, i % 2 == 0] for i in range(3)])
+ results = self.spanner_helper.read_data(
+ self.database_id, prefix='or_update')
+ self.assertEqual(len(results), 3)
+ for i, row in enumerate(results):
+ self.assertEqual(row[0], f'or_update{i}')
+ self.assertEqual(row[1], i)
+ self.assertEqual(row[2], i % 2 == 0)
+ self.assertIsNotNone(row[3]) # timestamp field
def test_spanner_insert(self):
def to_row_fn(num):
return SpannerTestRow(
- f_string=f'insert{num}', f_int64=num, f_boolean=None)
+ f_string=f'insert{num}',
+ f_int64=num,
+ f_boolean=None,
+ f_timestamp=Timestamp.of(1234567890.0 + num))
self.run_write_pipeline(1000, to_row_fn, SpannerTestRow, SpannerInsert)
def compare_row(row):
return row[1]
- self.assertEqual(
- sorted(
- self.spanner_helper.read_data(self.database_id, 'insert'),
- key=compare_row), [[f'insert{i}', i, None] for i in range(1000)])
+ results = sorted(
+ self.spanner_helper.read_data(self.database_id, 'insert'),
+ key=compare_row)
+
+ self.assertEqual(len(results), 1000)
+ for i, row in enumerate(results):
+ self.assertEqual(row[0], f'insert{i}')
+ self.assertEqual(row[1], i)
+ self.assertIsNone(row[2])
+ self.assertIsNotNone(row[3])
def test_spanner_replace(self):
self.spanner_helper.insert_values(
- self.database_id, [('replace0', 0, True), ('replace1', 1, False)])
+ self.database_id,
+ [('replace0', 0, True, Timestamp.of(1234567890.0).to_rfc3339()),
+ ('replace1', 1, False, Timestamp.of(1234567891.0).to_rfc3339())])
def to_row_fn(num):
- return SpannerPartTestRow(f_string=f'replace{num}', f_int64=num + 10)
+ return SpannerPartTestRow(
+ f_string=f'replace{num}',
+ f_int64=num + 10,
+ f_timestamp=Timestamp.of(1234567900.0 + num))
self.run_write_pipeline(2, to_row_fn, SpannerPartTestRow, SpannerReplace)
- self.assertEqual(
- self.spanner_helper.read_data(self.database_id, prefix='replace'),
- [['replace0', 10, None], ['replace1', 11, None]])
+ results = self.spanner_helper.read_data(self.database_id, prefix='replace')
+ self.assertEqual(len(results), 2)
+ # In REPLACE, boolean should be NULL but timestamp should be updated
+ self.assertEqual(results[0][0], 'replace0')
+ self.assertEqual(results[0][1], 10)
+ self.assertIsNone(results[0][2]) # boolean replaced with NULL
+ self.assertIsNotNone(results[0][3]) # timestamp updated
Review Comment:

The assertions in this test case only validate the first row of the results.
Since two rows are written to Spanner, it would be more robust to assert the
contents of both rows to ensure the `SpannerReplace` transform works as
expected for all inputs.
```python
results = self.spanner_helper.read_data(self.database_id,
prefix='replace')
self.assertEqual(len(results), 2)
# In REPLACE, boolean should be NULL but timestamp should be updated
self.assertEqual(results[0][0], 'replace0')
self.assertEqual(results[0][1], 10)
self.assertIsNone(results[0][2]) # boolean replaced with NULL
self.assertIsNotNone(results[0][3]) # timestamp updated
self.assertEqual(results[1][0], 'replace1')
self.assertEqual(results[1][1], 11)
self.assertIsNone(results[1][2]) # boolean replaced with NULL
self.assertIsNotNone(results[1][3]) # timestamp updated
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]