[ 
https://issues.apache.org/jira/browse/FLINK-35022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer reassigned FLINK-35022:
-------------------------------------

    Assignee: Ahmed Hamdy

> Add TypeInformed Element Converter for DynamoDbSink
> ---------------------------------------------------
>
>                 Key: FLINK-35022
>                 URL: https://issues.apache.org/jira/browse/FLINK-35022
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / DynamoDB
>    Affects Versions: aws-connector-4.3.0
>            Reporter: Ahmed Hamdy
>            Assignee: Ahmed Hamdy
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Context
> {{DynamoDbSink}} as an extentsion of {{AsyncSinkBase}} depends on 
> {{org.apache.flink.connector.base.sink.writer.ElementConverter}} to convert 
> Flink stream objects to DynamoDb write requests, where item is represented as 
> {{Map<String, AttributeValue[1]>}}.
> {{AttributeValue}} is the wrapper for the DynamoDb comprehendable Object in a 
> format similar with type identification properties as in
> {M": {"Name" : {"S": Joe }, "Age" : {"N": 35 }}}.
> Since TypeInformation is already natively supported in Flink, many 
> implementations of the DynamoDb ElementConverted is just a boiler plate. 
> For example 
> {code:title="Simple POJO Element Conversion"}
>  public class Order {
>         String id;
>         int quantity;
>         double total;
> }
> {code}
> The implementation of the converter must be 
> {code:title="Simple POJO DDB Element Converter"}
> public static class SimplePojoElementConverter implements 
> ElementConverter<Order, DynamoDbWriteRequest> {
>         @Override
>         public DynamoDbWriteRequest apply(Order order, SinkWriter.Context 
> context) {
>             Map<String, AttributeValue> itemMap = new HashMap<>();
>             itemMap.put("id", AttributeValue.builder().s(order.id).build());
>             itemMap.put("quantity", 
> AttributeValue.builder().n(String.valueOf(order.quantity)).build());
>             itemMap.put("total", 
> AttributeValue.builder().n(String.valueOf(order.total)).build());
>             return DynamoDbWriteRequest.builder()
>                     .setType(DynamoDbWriteRequestType.PUT)
>                     .setItem(itemMap)
>                     .build();
>         }
>         @Override
>         public void open(Sink.InitContext context) {
>             
>         }
>     }
> {code}
> while this might not be too much of work, however it is a fairly common case 
> in Flink and this implementation requires some fair knowledge of DDB model 
> for new users.
> h2. Proposal 
> Introduce {{ DynamoDbTypeInformedElementConverter}} as follows:
> {code:title="TypeInformedElementconverter"} 
> public class DynamoDbTypeInformedElementConverter<inputT> implements 
> ElementConverter<inputT, DynamoDbWriteRequest> {
> DynamoDbTypeInformedElementConverter(CompositeType<inputT> typeInfo);
>     public DynamoDbWriteRequest convertElement(input) {
>     switch this.typeInfo{
>         case: BasicTypeInfo.STRING_TYPE_INFO: return input -> 
> AttributeValue.fromS(o.toString())
>         case: BasicTypeInfo.SHORT_TYPE_INFO: 
>         case: BasicTypeInfo.INTEGER_TYPE_INFO: input -> 
> AttributeValue.fromN(o.toString())
>        case: TupleTypeInfo: input -> AttributeValue.fromL(converTuple(input))
>       .....
>     }
> }
> }
> // User Code
> public static void main(String []args) {
>   DynamoDbTypeInformedElementConverter elementConverter = new 
> DynamoDbTypeInformedElementConverter(TypeInformation.of(Order.class));
> DdbSink.setElementConverter(elementConverter); 
> }
> {code}
> We will start by supporting all Pojo/ basic/ Tuple/ Array typeInfo which 
> should be enough to cover all DDB supported types 
> (s,n,bool,b,ss,ns,bs,bools,m,l)
> 1- 
> https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/model/AttributeValue.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to