This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch release-2.51.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.51.0 by this push:
new 70f4a1ae26d CP for #28624 into release 2.51.0 (Bigtable Python
timestamp bug fix) (#28634)
70f4a1ae26d is described below
commit 70f4a1ae26d71fc89c3f5a99cae4e417cd1ee446
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Wed Sep 27 17:42:18 2023 -0400
CP for #28624 into release 2.51.0 (Bigtable Python timestamp bug fix)
(#28634)
---
.../BigtableWriteSchemaTransformProvider.java | 13 +++++++------
.../BigtableWriteSchemaTransformProviderIT.java | 19 +++++++++++--------
sdks/python/apache_beam/io/gcp/bigtableio.py | 7 +++----
sdks/python/apache_beam/io/gcp/bigtableio_it_test.py | 18 ++++++++++++++++++
4 files changed, 39 insertions(+), 18 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java
index d38bdae2f09..b99b69621a8 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java
@@ -179,12 +179,13 @@ public class BigtableWriteSchemaTransformProvider
.setColumnQualifier(
ByteString.copyFrom(ofNullable(mutation.get("column_qualifier")).get()))
.setFamilyNameBytes(
-
ByteString.copyFrom(ofNullable(mutation.get("family_name")).get()));
- if (mutation.containsKey("timestamp_micros")) {
- setMutation =
- setMutation.setTimestampMicros(
-
Longs.fromByteArray(ofNullable(mutation.get("timestamp_micros")).get()));
- }
+
ByteString.copyFrom(ofNullable(mutation.get("family_name")).get()))
+ // Use timestamp if provided, else default to -1 (current
Bigtable server time)
+ .setTimestampMicros(
+ mutation.containsKey("timestamp_micros")
+ ? Longs.fromByteArray(
+
ofNullable(mutation.get("timestamp_micros")).get())
+ : -1);
bigtableMutation =
Mutation.newBuilder().setSetCell(setMutation.build()).build();
break;
case "DeleteFromColumn":
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java
index 14bb04b0315..1a60fe661b5 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProviderIT.java
@@ -154,8 +154,8 @@ public class BigtableWriteSchemaTransformProviderIT {
public void testSetMutationsExistingColumn() {
RowMutation rowMutation =
RowMutation.create(tableId, "key-1")
- .setCell(COLUMN_FAMILY_NAME_1, "col_a", "val-1-a")
- .setCell(COLUMN_FAMILY_NAME_2, "col_c", "val-1-c");
+ .setCell(COLUMN_FAMILY_NAME_1, "col_a", 1000, "val-1-a")
+ .setCell(COLUMN_FAMILY_NAME_2, "col_c", 1000, "val-1-c");
dataClient.mutateRow(rowMutation);
List<Map<String, byte[]>> mutations = new ArrayList<>();
@@ -165,13 +165,15 @@ public class BigtableWriteSchemaTransformProviderIT {
"type", "SetCell".getBytes(StandardCharsets.UTF_8),
"value", "new-val-1-a".getBytes(StandardCharsets.UTF_8),
"column_qualifier", "col_a".getBytes(StandardCharsets.UTF_8),
- "family_name",
COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8)));
+ "family_name",
COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8),
+ "timestamp_micros", Longs.toByteArray(2000)));
mutations.add(
ImmutableMap.of(
"type", "SetCell".getBytes(StandardCharsets.UTF_8),
"value", "new-val-1-c".getBytes(StandardCharsets.UTF_8),
"column_qualifier", "col_c".getBytes(StandardCharsets.UTF_8),
- "family_name",
COLUMN_FAMILY_NAME_2.getBytes(StandardCharsets.UTF_8)));
+ "family_name",
COLUMN_FAMILY_NAME_2.getBytes(StandardCharsets.UTF_8),
+ "timestamp_micros", Longs.toByteArray(2000)));
Row mutationRow =
Row.withSchema(SCHEMA)
.withFieldValue("key", "key-1".getBytes(StandardCharsets.UTF_8))
@@ -202,10 +204,11 @@ public class BigtableWriteSchemaTransformProviderIT {
.collect(Collectors.toList());
assertEquals(2, cellsColA.size());
assertEquals(2, cellsColC.size());
- System.out.println(cellsColA);
- System.out.println(cellsColC);
- assertEquals("new-val-1-a", cellsColA.get(1).getValue().toStringUtf8());
- assertEquals("new-val-1-c", cellsColC.get(1).getValue().toStringUtf8());
+ // Bigtable keeps cell history ordered by descending timestamp
+ assertEquals("new-val-1-a", cellsColA.get(0).getValue().toStringUtf8());
+ assertEquals("new-val-1-c", cellsColC.get(0).getValue().toStringUtf8());
+ assertEquals("val-1-a", cellsColA.get(1).getValue().toStringUtf8());
+ assertEquals("val-1-c", cellsColC.get(1).getValue().toStringUtf8());
}
@Test
diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py
b/sdks/python/apache_beam/io/gcp/bigtableio.py
index b2b52bd675c..f8534f38ddf 100644
--- a/sdks/python/apache_beam/io/gcp/bigtableio.py
+++ b/sdks/python/apache_beam/io/gcp/bigtableio.py
@@ -252,11 +252,10 @@ class WriteToBigTable(beam.PTransform):
"type": b'SetCell',
"family_name": mutation.set_cell.family_name.encode('utf-8'),
"column_qualifier": mutation.set_cell.column_qualifier,
- "value": mutation.set_cell.value
+ "value": mutation.set_cell.value,
+ "timestamp_micros": struct.pack(
+ '>q', mutation.set_cell.timestamp_micros)
}
- micros = mutation.set_cell.timestamp_micros
- if micros > -1:
- mutation_dict['timestamp_micros'] = struct.pack('>q', micros)
elif mutation.__contains__("delete_from_column"):
mutation_dict = {
"type": b'DeleteFromColumn',
diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
index 341f2983c8b..f61e346cff9 100644
--- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
@@ -223,6 +223,9 @@ class TestWriteToBigtableXlangIT(unittest.TestCase):
row1_col2_cell = Cell(b'val1-2', 200_000_000)
row2_col1_cell = Cell(b'val2-1', 100_000_000)
row2_col2_cell = Cell(b'val2-2', 200_000_000)
+ # When setting this cell, we won't set a timestamp. We expect the timestamp
+ # to default to -1, and Bigtable will set it to system time at insertion.
+ row2_col1_no_timestamp = Cell(b'val2-2-notimestamp', time.time())
# rows sent to write transform
row1.set_cell(
'col_fam', b'col-1', row1_col1_cell.value, row1_col1_cell.timestamp)
@@ -232,6 +235,8 @@ class TestWriteToBigtableXlangIT(unittest.TestCase):
'col_fam', b'col-1', row2_col1_cell.value, row2_col1_cell.timestamp)
row2.set_cell(
'col_fam', b'col-2', row2_col2_cell.value, row2_col2_cell.timestamp)
+ # don't set a timestamp here. it should default to -1
+ row2.set_cell('col_fam', b'col-no-timestamp', row2_col1_no_timestamp.value)
self.run_pipeline([row1, row2])
@@ -249,6 +254,19 @@ class TestWriteToBigtableXlangIT(unittest.TestCase):
self.assertEqual(
row2_col2_cell, actual_row2.find_cells('col_fam', b'col-2')[0])
+ # check mutation that doesn't have a timestamp set is handled properly:
+ self.assertEqual(
+ row2_col1_no_timestamp.value,
+ actual_row2.find_cells('col_fam', b'col-no-timestamp')[0].value)
+ # Bigtable sets timestamp as insertion time, which is later than the
+ # time.time() we set when creating this test case
+ cell_timestamp = actual_row2.find_cells('col_fam',
+ b'col-no-timestamp')[0].timestamp
+ self.assertTrue(
+ row2_col1_no_timestamp.timestamp < cell_timestamp,
+ msg="Expected cell with unset timestamp to have ingestion time "
+ f"attached, but was {cell_timestamp}")
+
def test_delete_cells_mutation(self):
col_fam = self.table.column_family('col_fam')
col_fam.create()