vahmed-hamdy commented on code in PR #152:
URL: 
https://github.com/apache/flink-connector-aws/pull/152#discussion_r1694261617


##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java:
##########
@@ -547,12 +552,132 @@ void testInstantArray() {
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
 
+    @Test
+    void testDeleteOnlyPrimaryKey() {
+        String key = "key";
+        String value = "some_string";
+        String otherField = "other_field";
+        String otherValue = "other_value";
+        Set<String> primaryKeys = new 
HashSet<>(Collections.singletonList(key));
+
+        // Create a Row with two fields - "key" and "otherField".  "key" is 
the single primary key.
+        // For a Delete request, only "key" should be included in the 
expectedResult, and not "otherField".
+        DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, 
DataTypes.STRING()), DataTypes.FIELD(otherField, DataTypes.STRING()));
+        RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+                new RowDataToAttributeValueConverter(dataType, primaryKeys);
+        RowData rowData = 
createElementWithMultipleFields(StringData.fromString(value), 
StringData.fromString(otherValue));
+        rowData.setRowKind(RowKind.DELETE);
+
+        Map<String, AttributeValue> actualResult =
+                rowDataToAttributeValueConverter.convertRowData(rowData);
+        Map<String, AttributeValue> expectedResult =
+                singletonMap(key, AttributeValue.builder().s(value).build());
+
+        assertThat(actualResult).containsAllEntriesOf(expectedResult);
+        assertThat(expectedResult).containsAllEntriesOf(actualResult);
+    }
+
+    @Test
+    void testDeleteOnlyPrimaryKeys() {
+        String key = "key";
+        String value = "some_string";
+        String additionalKey = "additional_key";
+        String additionalValue = "additional_value";
+        String otherField = "other_field";
+        String otherValue = "other_value";
+        Set<String> primaryKeys = new HashSet<>();
+        primaryKeys.add(key);
+        primaryKeys.add(additionalKey);
+
+        // Create a Row with three fields - "key", "additional_key", and 
"otherField".
+        // "key" and "additional_key" make up the composite primary key.
+        // For a Delete request, only "key" and "additional_key" should be 
included in the expectedResult, and not "otherField".
+        DataType dataType = DataTypes.ROW(
+                DataTypes.FIELD(key, DataTypes.STRING()),
+                DataTypes.FIELD(additionalKey, DataTypes.STRING()),
+                DataTypes.FIELD(otherField, DataTypes.STRING()));
+        RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+                new RowDataToAttributeValueConverter(dataType, primaryKeys);
+        RowData rowData = createElementWithMultipleFields(
+                StringData.fromString(value), 
StringData.fromString(additionalValue), StringData.fromString(otherValue));
+        rowData.setRowKind(RowKind.DELETE);
+
+        Map<String, AttributeValue> actualResult =
+                rowDataToAttributeValueConverter.convertRowData(rowData);
+        Map<String, AttributeValue> expectedResult = new HashMap<>();
+        expectedResult.put(key, AttributeValue.builder().s(value).build());
+        expectedResult.put(additionalKey, 
AttributeValue.builder().s(additionalValue).build());
+
+        assertThat(actualResult).containsAllEntriesOf(expectedResult);

Review Comment:
   nit: could be done on one assert using containsExactly



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java:
##########
@@ -43,14 +47,35 @@ public class RowDataToAttributeValueConverter {
 
     private final DataType physicalDataType;
     private final TableSchema<RowData> tableSchema;
+    private final Set<String> primaryKeys;
 
-    public RowDataToAttributeValueConverter(DataType physicalDataType) {
+    public RowDataToAttributeValueConverter(DataType physicalDataType, 
Set<String> primaryKeys) {

Review Comment:
   nit: could we overload the constructor instead and pass empty set on default?



##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java:
##########
@@ -547,12 +552,132 @@ void testInstantArray() {
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
 
+    @Test
+    void testDeleteOnlyPrimaryKey() {
+        String key = "key";
+        String value = "some_string";
+        String otherField = "other_field";
+        String otherValue = "other_value";
+        Set<String> primaryKeys = new 
HashSet<>(Collections.singletonList(key));
+
+        // Create a Row with two fields - "key" and "otherField".  "key" is 
the single primary key.
+        // For a Delete request, only "key" should be included in the 
expectedResult, and not "otherField".
+        DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, 
DataTypes.STRING()), DataTypes.FIELD(otherField, DataTypes.STRING()));
+        RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+                new RowDataToAttributeValueConverter(dataType, primaryKeys);
+        RowData rowData = 
createElementWithMultipleFields(StringData.fromString(value), 
StringData.fromString(otherValue));
+        rowData.setRowKind(RowKind.DELETE);
+
+        Map<String, AttributeValue> actualResult =
+                rowDataToAttributeValueConverter.convertRowData(rowData);
+        Map<String, AttributeValue> expectedResult =
+                singletonMap(key, AttributeValue.builder().s(value).build());
+
+        assertThat(actualResult).containsAllEntriesOf(expectedResult);

Review Comment:
   nit: could be done on one assert using `containsExactly`



##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java:
##########
@@ -547,12 +552,132 @@ void testInstantArray() {
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
 
+    @Test

Review Comment:
   Could we add tests for delete with no primary key and verify exception



##########
flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java:
##########
@@ -547,12 +552,132 @@ void testInstantArray() {
         assertThat(actualResult).containsAllEntriesOf(expectedResult);
     }
 
+    @Test
+    void testDeleteOnlyPrimaryKey() {
+        String key = "key";
+        String value = "some_string";
+        String otherField = "other_field";
+        String otherValue = "other_value";
+        Set<String> primaryKeys = new 
HashSet<>(Collections.singletonList(key));
+
+        // Create a Row with two fields - "key" and "otherField".  "key" is 
the single primary key.
+        // For a Delete request, only "key" should be included in the 
expectedResult, and not "otherField".
+        DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, 
DataTypes.STRING()), DataTypes.FIELD(otherField, DataTypes.STRING()));
+        RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+                new RowDataToAttributeValueConverter(dataType, primaryKeys);
+        RowData rowData = 
createElementWithMultipleFields(StringData.fromString(value), 
StringData.fromString(otherValue));
+        rowData.setRowKind(RowKind.DELETE);
+
+        Map<String, AttributeValue> actualResult =
+                rowDataToAttributeValueConverter.convertRowData(rowData);
+        Map<String, AttributeValue> expectedResult =
+                singletonMap(key, AttributeValue.builder().s(value).build());
+
+        assertThat(actualResult).containsAllEntriesOf(expectedResult);
+        assertThat(expectedResult).containsAllEntriesOf(actualResult);
+    }
+
+    @Test
+    void testDeleteOnlyPrimaryKeys() {
+        String key = "key";
+        String value = "some_string";
+        String additionalKey = "additional_key";
+        String additionalValue = "additional_value";
+        String otherField = "other_field";
+        String otherValue = "other_value";
+        Set<String> primaryKeys = new HashSet<>();
+        primaryKeys.add(key);
+        primaryKeys.add(additionalKey);
+
+        // Create a Row with three fields - "key", "additional_key", and 
"otherField".
+        // "key" and "additional_key" make up the composite primary key.
+        // For a Delete request, only "key" and "additional_key" should be 
included in the expectedResult, and not "otherField".
+        DataType dataType = DataTypes.ROW(
+                DataTypes.FIELD(key, DataTypes.STRING()),
+                DataTypes.FIELD(additionalKey, DataTypes.STRING()),
+                DataTypes.FIELD(otherField, DataTypes.STRING()));
+        RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+                new RowDataToAttributeValueConverter(dataType, primaryKeys);
+        RowData rowData = createElementWithMultipleFields(
+                StringData.fromString(value), 
StringData.fromString(additionalValue), StringData.fromString(otherValue));
+        rowData.setRowKind(RowKind.DELETE);
+
+        Map<String, AttributeValue> actualResult =
+                rowDataToAttributeValueConverter.convertRowData(rowData);
+        Map<String, AttributeValue> expectedResult = new HashMap<>();
+        expectedResult.put(key, AttributeValue.builder().s(value).build());
+        expectedResult.put(additionalKey, 
AttributeValue.builder().s(additionalValue).build());
+
+        assertThat(actualResult).containsAllEntriesOf(expectedResult);
+        assertThat(expectedResult).containsAllEntriesOf(actualResult);
+    }
+
+    @Test
+    void testPKIgnoredForInsert() {
+        String key = "key";
+        String value = "some_string";
+        String otherField = "other_field";
+        String otherValue = "other_value";
+        Set<String> primaryKeys = new 
HashSet<>(Collections.singletonList(key));
+
+        // Create a Row with two fields - "key" and "otherField".  "key" is 
the primary key.
+        // For an Insert request, all fields should be included regardless of 
the Primary Key.
+        DataType dataType = DataTypes.ROW(DataTypes.FIELD(key, 
DataTypes.STRING()), DataTypes.FIELD(otherField, DataTypes.STRING()));
+        RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+                new RowDataToAttributeValueConverter(dataType, primaryKeys);
+        RowData rowData = 
createElementWithMultipleFields(StringData.fromString(value), 
StringData.fromString(otherValue));
+        rowData.setRowKind(RowKind.INSERT);
+
+        Map<String, AttributeValue> actualResult =
+                rowDataToAttributeValueConverter.convertRowData(rowData);
+        Map<String, AttributeValue> expectedResult = new HashMap<>();
+        expectedResult.put(key, AttributeValue.builder().s(value).build());
+        expectedResult.put(otherField, 
AttributeValue.builder().s(otherValue).build());
+
+        assertThat(actualResult).containsAllEntriesOf(expectedResult);

Review Comment:
   nit: could be done on one assert using containsExactly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to