z3d1k commented on code in PR #152:
URL:
https://github.com/apache/flink-connector-aws/pull/152#discussion_r1711207726
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java:
##########
@@ -43,14 +47,35 @@ public class RowDataToAttributeValueConverter {
private final DataType physicalDataType;
private final TableSchema<RowData> tableSchema;
+ private final Set<String> primaryKeys;
- public RowDataToAttributeValueConverter(DataType physicalDataType) {
+ public RowDataToAttributeValueConverter(DataType physicalDataType,
Set<String> primaryKeys) {
Review Comment:
Instead of duplicating logic, it will be better to call constructor with all
parameters from one with defaults. This way there only 1 place to change if
required:
```
public RowDataToAttributeValueConverter(DataType physicalDataType) {
this(physicalDataType, Collections.emptySet());
}
```
##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverterTest.java:
##########
@@ -45,10 +48,17 @@ public class RowDataElementConverterTest {
DataTypes.FIELD("partition_key", DataTypes.STRING()),
DataTypes.FIELD("payload", DataTypes.STRING()));
private static final RowDataElementConverter elementConverter =
- new RowDataElementConverter(DATA_TYPE);
+ new RowDataElementConverter(DATA_TYPE, null);
private static final SinkWriter.Context context = new
UnusedSinkWriterContext();
private static final RowDataToAttributeValueConverter
rowDataToAttributeValueConverter =
- new RowDataToAttributeValueConverter(DATA_TYPE);
+ new RowDataToAttributeValueConverter(DATA_TYPE, null);
+
+ private static final Set<String> primaryKeys =
+ new HashSet<>(Collections.singletonList("partition_key"));
Review Comment:
nit: You can use `Collections.singleton(...)` to create Set (immutable) with
a single element.
##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java:
##########
@@ -51,7 +58,7 @@ void testChar() {
DataType dataType = DataTypes.ROW(DataTypes.FIELD(key,
DataTypes.CHAR(9)));
RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
- new RowDataToAttributeValueConverter(dataType);
+ new RowDataToAttributeValueConverter(dataType, null);
Review Comment:
Here and below, you can remove `null` argument since you've added
constructor overload
##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java:
##########
@@ -547,12 +552,132 @@ void testInstantArray() {
assertThat(actualResult).containsAllEntriesOf(expectedResult);
}
+ @Test
+ void testDeleteOnlyPrimaryKey() {
+ String key = "key";
+ String value = "some_string";
+ String otherField = "other_field";
+ String otherValue = "other_value";
+ Set<String> primaryKeys = new
HashSet<>(Collections.singletonList(key));
+
+ // Create a Row with two fields - "key" and "otherField". "key" is
the single primary key.
+ // For a Delete request, only "key" should be included in the
expectedResult, and not "otherField".
+ DataType dataType = DataTypes.ROW(DataTypes.FIELD(key,
DataTypes.STRING()), DataTypes.FIELD(otherField, DataTypes.STRING()));
+ RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+ new RowDataToAttributeValueConverter(dataType, primaryKeys);
+ RowData rowData =
createElementWithMultipleFields(StringData.fromString(value),
StringData.fromString(otherValue));
+ rowData.setRowKind(RowKind.DELETE);
+
+ Map<String, AttributeValue> actualResult =
+ rowDataToAttributeValueConverter.convertRowData(rowData);
+ Map<String, AttributeValue> expectedResult =
+ singletonMap(key, AttributeValue.builder().s(value).build());
+
+ assertThat(actualResult).containsAllEntriesOf(expectedResult);
+ assertThat(expectedResult).containsAllEntriesOf(actualResult);
+ }
+
+ @Test
+ void testDeleteOnlyPrimaryKeys() {
+ String key = "key";
+ String value = "some_string";
+ String additionalKey = "additional_key";
+ String additionalValue = "additional_value";
+ String otherField = "other_field";
+ String otherValue = "other_value";
+ Set<String> primaryKeys = new HashSet<>();
+ primaryKeys.add(key);
+ primaryKeys.add(additionalKey);
+
+ // Create a Row with three fields - "key", "additional_key", and
"otherField".
+ // "key" and "additional_key" make up the composite primary key.
+ // For a Delete request, only "key" and "additional_key" should be
included in the expectedResult, and not "otherField".
+ DataType dataType = DataTypes.ROW(
+ DataTypes.FIELD(key, DataTypes.STRING()),
+ DataTypes.FIELD(additionalKey, DataTypes.STRING()),
+ DataTypes.FIELD(otherField, DataTypes.STRING()));
+ RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+ new RowDataToAttributeValueConverter(dataType, primaryKeys);
+ RowData rowData = createElementWithMultipleFields(
+ StringData.fromString(value),
StringData.fromString(additionalValue), StringData.fromString(otherValue));
+ rowData.setRowKind(RowKind.DELETE);
+
+ Map<String, AttributeValue> actualResult =
+ rowDataToAttributeValueConverter.convertRowData(rowData);
+ Map<String, AttributeValue> expectedResult = new HashMap<>();
+ expectedResult.put(key, AttributeValue.builder().s(value).build());
+ expectedResult.put(additionalKey,
AttributeValue.builder().s(additionalValue).build());
+
+ assertThat(actualResult).containsAllEntriesOf(expectedResult);
+ assertThat(expectedResult).containsAllEntriesOf(actualResult);
+ }
+
+ @Test
+ void testPKIgnoredForInsert() {
+ String key = "key";
+ String value = "some_string";
+ String otherField = "other_field";
+ String otherValue = "other_value";
+ Set<String> primaryKeys = new
HashSet<>(Collections.singletonList(key));
+
+ // Create a Row with two fields - "key" and "otherField". "key" is
the primary key.
+ // For an Insert request, all fields should be included regardless of
the Primary Key.
+ DataType dataType = DataTypes.ROW(DataTypes.FIELD(key,
DataTypes.STRING()), DataTypes.FIELD(otherField, DataTypes.STRING()));
+ RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+ new RowDataToAttributeValueConverter(dataType, primaryKeys);
+ RowData rowData =
createElementWithMultipleFields(StringData.fromString(value),
StringData.fromString(otherValue));
+ rowData.setRowKind(RowKind.INSERT);
+
+ Map<String, AttributeValue> actualResult =
+ rowDataToAttributeValueConverter.convertRowData(rowData);
+ Map<String, AttributeValue> expectedResult = new HashMap<>();
+ expectedResult.put(key, AttributeValue.builder().s(value).build());
+ expectedResult.put(otherField,
AttributeValue.builder().s(otherValue).build());
+
+ assertThat(actualResult).containsAllEntriesOf(expectedResult);
Review Comment:
You can use `assertThat(...).containsExactlyInAnyOrderEntriesOf(...)` here
to check that maps contain same elements without worrying about order.
##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java:
##########
@@ -547,12 +554,180 @@ void testInstantArray() {
assertThat(actualResult).containsAllEntriesOf(expectedResult);
}
+ @Test
+ void testDeleteExceptionWhenNoPrimaryKey() {
+ String key = "key";
+ String value = "some_string";
+ String otherField = "other_field";
+ String otherValue = "other_value";
+ Set<String> primaryKeys = Collections.emptySet();
+
+ // Create a Row with two fields - "key" and "otherField". "key" is
the single primary key.
+ DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD(key, DataTypes.STRING()),
+ DataTypes.FIELD(otherField, DataTypes.STRING()));
+ RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+ new RowDataToAttributeValueConverter(dataType, primaryKeys);
+ RowData rowData =
+ createElementWithMultipleFields(
+ StringData.fromString(value),
StringData.fromString(otherValue));
+ rowData.setRowKind(RowKind.DELETE);
+
+ assertThrows(
+ TableException.class,
+ () -> {
+ rowDataToAttributeValueConverter.convertRowData(rowData);
+ });
+ }
+
+ void testDeleteOnlyPrimaryKey() {
+ String key = "key";
+ String value = "some_string";
+ String otherField = "other_field";
+ String otherValue = "other_value";
+ Set<String> primaryKeys = new
HashSet<>(Collections.singletonList(key));
+
+ // Create a Row with two fields - "key" and "otherField". "key" is
the single primary key.
+ // For a Delete request, only "key" should be included in the
expectedResult, and not
+ // "otherField".
+ DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD(key, DataTypes.STRING()),
+ DataTypes.FIELD(otherField, DataTypes.STRING()));
+ RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+ new RowDataToAttributeValueConverter(dataType, primaryKeys);
+ RowData rowData =
+ createElementWithMultipleFields(
+ StringData.fromString(value),
StringData.fromString(otherValue));
+ rowData.setRowKind(RowKind.DELETE);
+
+ Map<String, AttributeValue> actualResult =
+ rowDataToAttributeValueConverter.convertRowData(rowData);
+ Map<String, AttributeValue> expectedResult =
+ singletonMap(key, AttributeValue.builder().s(value).build());
+
+ assertThat(actualResult).containsAllEntriesOf(expectedResult);
+ assertThat(expectedResult).containsAllEntriesOf(actualResult);
+ }
+
+ @Test
+ void testDeleteOnlyPrimaryKeys() {
+ String key = "key";
+ String value = "some_string";
+ String additionalKey = "additional_key";
+ String additionalValue = "additional_value";
+ String otherField = "other_field";
+ String otherValue = "other_value";
+ Set<String> primaryKeys = new HashSet<>();
+ primaryKeys.add(key);
+ primaryKeys.add(additionalKey);
+
+ // Create a Row with three fields - "key", "additional_key", and
"otherField".
+ // "key" and "additional_key" make up the composite primary key.
+ // For a Delete request, only "key" and "additional_key" should be
included in the
+ // expectedResult, and not "otherField".
+ DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD(key, DataTypes.STRING()),
+ DataTypes.FIELD(additionalKey, DataTypes.STRING()),
+ DataTypes.FIELD(otherField, DataTypes.STRING()));
+ RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+ new RowDataToAttributeValueConverter(dataType, primaryKeys);
+ RowData rowData =
+ createElementWithMultipleFields(
+ StringData.fromString(value),
+ StringData.fromString(additionalValue),
+ StringData.fromString(otherValue));
+ rowData.setRowKind(RowKind.DELETE);
+
+ Map<String, AttributeValue> actualResult =
+ rowDataToAttributeValueConverter.convertRowData(rowData);
+ Map<String, AttributeValue> expectedResult = new HashMap<>();
+ expectedResult.put(key, AttributeValue.builder().s(value).build());
+ expectedResult.put(additionalKey,
AttributeValue.builder().s(additionalValue).build());
+
+ assertThat(actualResult).containsAllEntriesOf(expectedResult);
+ assertThat(expectedResult).containsAllEntriesOf(actualResult);
+ }
+
+ @Test
+ void testPKIgnoredForInsert() {
+ String key = "key";
+ String value = "some_string";
+ String otherField = "other_field";
+ String otherValue = "other_value";
+ Set<String> primaryKeys = new
HashSet<>(Collections.singletonList(key));
+
+ // Create a Row with two fields - "key" and "otherField". "key" is
the primary key.
+ // For an Insert request, all fields should be included regardless of
the Primary Key.
+ DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD(key, DataTypes.STRING()),
+ DataTypes.FIELD(otherField, DataTypes.STRING()));
+ RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+ new RowDataToAttributeValueConverter(dataType, primaryKeys);
+ RowData rowData =
+ createElementWithMultipleFields(
+ StringData.fromString(value),
StringData.fromString(otherValue));
+ rowData.setRowKind(RowKind.INSERT);
+
+ Map<String, AttributeValue> actualResult =
+ rowDataToAttributeValueConverter.convertRowData(rowData);
+ Map<String, AttributeValue> expectedResult = new HashMap<>();
+ expectedResult.put(key, AttributeValue.builder().s(value).build());
+ expectedResult.put(otherField,
AttributeValue.builder().s(otherValue).build());
+
+ assertThat(actualResult).containsAllEntriesOf(expectedResult);
+ assertThat(expectedResult).containsAllEntriesOf(actualResult);
+ }
+
+ @Test
+ void testPKIgnoredForUpdateAfter() {
+ String key = "key";
+ String value = "some_string";
+ String otherField = "other_field";
+ String otherValue = "other_value";
+ Set<String> primaryKeys = new
HashSet<>(Collections.singletonList(key));
Review Comment:
nit: `Collections.singleton(key)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]