dannycranmer commented on code in PR #1:
URL: 
https://github.com/apache/flink-connector-dynamodb/pull/1#discussion_r965965165


##########
flink-connector-aws-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/util/DynamoDbAttributeValueUtils.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.util;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.dynamodb.sink.DynamoDbAttributeValue;
+import org.apache.flink.streaming.connectors.dynamodb.sink.DynamoDbRequest;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutRequest;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** Utils class to convert this sink representation of AWS sdk2 dynamodb 
classes. */
+@Internal
+public class DynamoDbAttributeValueUtils {
+
+    public static WriteRequest toWriteRequest(DynamoDbRequest dynamoDbRequest) 
{
+        if (dynamoDbRequest.putRequest() != null) {
+            return WriteRequest.builder()
+                    .putRequest(
+                            PutRequest.builder()
+                                    
.item(toAttributeValueMap(dynamoDbRequest.putRequest().item()))
+                                    .build())
+                    .build();
+        } else if (dynamoDbRequest.deleteRequest() != null) {

Review Comment:
   The `DynamoDbRequestConverter` allows creation of a `dynamoDbRequest` 
containing both `putRequest` and `deleteRequest`, yet the `deleteRequest` would 
be dropped here. Either support both or one, but needs to be considered 
throughout



##########
flink-connector-aws-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkElementConverter.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import 
org.apache.flink.streaming.connectors.dynamodb.util.DynamoDbAttributeValueUtils;
+import org.apache.flink.util.Preconditions;
+
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+/**
+ * An implementation of the {@link ElementConverter} that uses the AWS 
DynamoDb SDK v2. The user
+ * needs to provide a {@link DynamoDbRequestConverter} of the {@code InputT} 
to transform it into a
+ * {@link DynamoDbWriteRequest} that may be persisted.
+ */
+@Internal
+public class DynamoDbSinkElementConverter<InputT>

Review Comment:
   It looks like we have created wrappers around the AWS SDK DynamoDB model 
within the sink. Can you please explain your reasoning behind this approach? 
The idea behind hiding the ElementConverter is to 1/ hide the internal 
implementation and 2/ enable sink to manage de/serialisation. However I am 
struggling to see the benefits here since the 2x models are very similar. If we 
will copy the underlying model it might be best to use that model, since users 
might already be creating these objects. 
   
   Did you consider using a more generic `ElementConverter`, like the 
[DynamoDBMapper](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBMapper.html)?
 I am familiar with this so not sure if there are performance/feature 
limitations



##########
flink-connector-aws-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbAttributeValue.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Represents a DynamoDb Attribute value.
+ *
+ * <p>The following fields represents the different data types in dynamo db. 
Only <b>one</b> field
+ * should be set.
+ *
+ * <ul>
+ *   <li>s - String
+ *   <li>n - Number
+ *   <li>b - Binary
+ *   <li>bool - Boolean
+ *   <li>nul - Null
+ *   <li>ss - String set
+ *   <li>ns - Number set
+ *   <li>bs - Binary set
+ *   <li>l - List of DynamoDbAttributeValue
+ *   <li>m - Map of String to DynamoDbAttributeValue
+ * </ul>
+ */
+@PublicEvolving
+public class DynamoDbAttributeValue implements Serializable {
+
+    private final Boolean nul;
+    private final String s;
+    private final String n;
+    private final byte[] b;
+    private final Boolean bool;
+    private final Set<String> ss;
+    private final Set<String> ns;
+    private final Set<byte[]> bs;
+    private final List<DynamoDbAttributeValue> l;
+    private final Map<String, DynamoDbAttributeValue> m;
+

Review Comment:
   How about if the user sets multiple fields? I assume this would result in 
some fields being dropped, but there is nothing to prevent it



##########
pom.xml:
##########
@@ -0,0 +1,1140 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";

Review Comment:
   I will open a separate PR to include the majority of this pom separately so 
we can decouple from yours



##########
flink-connector-aws-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/config/DynamoDbTablesConfig.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Represents DynamoDB tables configuration. */
+@PublicEvolving
+public class DynamoDbTablesConfig implements Serializable {

Review Comment:
   We are adding complexity here to support multiple tables. I wonder if it is 
best to support a single table per sink and then user would create separate 
operator per table. This is inline with the JDBC connector, what do you think? 
Do you typically use multiple tables per sink in your jobs?



##########
pom.xml:
##########
@@ -0,0 +1,1140 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+       <parent>
+               <groupId>org.apache</groupId>
+               <artifactId>apache</artifactId>
+               <version>20</version>
+       </parent>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <groupId>org.apache.flink</groupId>
+       <artifactId>flink-connectors</artifactId>
+       <version>1.16-SNAPSHOT</version>
+       <name>Flink : Connectors : </name>

Review Comment:
   ?



##########
pom.xml:
##########
@@ -0,0 +1,1140 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+       <parent>
+               <groupId>org.apache</groupId>
+               <artifactId>apache</artifactId>
+               <version>20</version>
+       </parent>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <groupId>org.apache.flink</groupId>
+       <artifactId>flink-connectors</artifactId>
+       <version>1.16-SNAPSHOT</version>
+       <name>Flink : Connectors : </name>
+       <packaging>pom</packaging>
+       <url>https://flink.apache.org</url>
+       <inceptionYear>2022</inceptionYear>
+
+       <licenses>
+               <license>
+                       <name>The Apache Software License, Version 2.0</name>
+                       
<url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+                       <distribution>repo</distribution>
+               </license>
+       </licenses>
+
+       <scm>
+               <url>https://github.com/apache/flink-connector-dynamodb</url>
+               
<connection>[email protected]:apache/flink-connector-dynamodb.git</connection>
+               <developerConnection>
+                       
scm:git:https://gitbox.apache.org/repos/asf/flink-connector-dynamodb.git
+               </developerConnection>
+       </scm>
+
+       <modules>
+               <module>flink-connector-aws-dynamodb</module>
+       </modules>
+
+       <properties>
+               
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+               
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+               <flink.version>1.16-SNAPSHOT</flink.version>
+               <flink.shaded.version>15.0</flink.shaded.version>
+               
<flink.shaded.jackson.version>2.12.4</flink.shaded.jackson.version>
+               <target.java.version>1.8</target.java.version>
+               
<maven.compiler.source>${target.java.version}</maven.compiler.source>
+               
<maven.compiler.target>${target.java.version}</maven.compiler.target>
+               <scala.version>2.12.7</scala.version>
+               <scala.binary.version>2.12</scala.binary.version>
+
+               <junit4.version>4.13.2</junit4.version>
+               <junit5.version>5.8.1</junit5.version>
+               <assertj.version>3.21.0</assertj.version>
+               <archunit.version>0.22.0</archunit.version>
+               <testcontainers.version>1.16.2</testcontainers.version>
+               <mockito.version>2.21.0</mockito.version>
+               <spotless.version>2.4.2</spotless.version>
+
+               <japicmp.skip>false</japicmp.skip>
+               <japicmp.referenceVersion>1.14.0</japicmp.referenceVersion>
+               <japicmp.outputDir>tools/japicmp-output</japicmp.outputDir>
+
+               <slf4j.version>1.7.36</slf4j.version>
+               <log4j.version>2.17.2</log4j.version>
+
+               <flink.convergence.phase>validate</flink.convergence.phase>
+               <test.randomization.seed/>
+               <test.unit.pattern>**/*Test.*</test.unit.pattern>
+
+               <aws.sdk.version>2.17.249</aws.sdk.version>
+       </properties>
+
+
+
+       <dependencyManagement>

Review Comment:
   Let's use the AWS BOM instead of individual dependencies, see 
https://github.com/apache/flink/pull/20521/files



##########
flink-connector-aws-dynamodb/pom.xml:
##########
@@ -0,0 +1,162 @@
+<?xml version="1.0" encoding="UTF-8"?>

Review Comment:
   Out of the box I get compilation error when running `mvn clean package`
   
   ```
   [ERROR] 
flink-connector-dynamodb/flink-connector-aws-dynamodb/src/test/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkITCase.java:[60,76]
 cannot find symbol
   [ERROR]   symbol:   variable DYNAMODB
   [ERROR]   location: class org.apache.flink.util.DockerImageVersions
   ```
   



##########
flink-connector-aws-dynamodb/pom.xml:
##########
@@ -0,0 +1,162 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connectors</artifactId>
+        <version>1.16-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flink-connector-aws-dynamodb</artifactId>
+    <name>Flink : Connectors : Amazon DynamoDB</name>
+
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-force-shading</artifactId>
+        </dependency>
+        <!-- Connectors -->

Review Comment:
   Fix formatting here, move comment down



##########
flink-connector-aws-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/util/DynamoDbAttributeValueUtils.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.util;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.dynamodb.sink.DynamoDbAttributeValue;
+import org.apache.flink.streaming.connectors.dynamodb.sink.DynamoDbRequest;
+
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
+import software.amazon.awssdk.services.dynamodb.model.PutRequest;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** Utils class to convert this sink representation of AWS sdk2 dynamodb 
classes. */
+@Internal
+public class DynamoDbAttributeValueUtils {
+
+    public static WriteRequest toWriteRequest(DynamoDbRequest dynamoDbRequest) 
{

Review Comment:
   This method sits out of place, give the Util class is pertaining to 
`DynamoDbAttributeValue`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to