[ 
https://issues.apache.org/jira/browse/BAHIR-202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16800596#comment-16800596
 ] 

ASF GitHub Bot commented on BAHIR-202:
--------------------------------------

SuXingLee commented on pull request #50: [BAHIR-202] Improve KuduSink 
throughput by using async FlushMode
URL: https://github.com/apache/bahir-flink/pull/50#discussion_r268597819
 
 

 ##########
 File path: 
flink-connector-kudu/src/main/java/org/apache/flink/streaming/connectors/kudu/KuduSink.java
 ##########
 @@ -79,9 +86,12 @@ public KuduSink(String kuduMasters, KuduTableInfo 
tableInfo, KuduSerialization<O
 
     @Override
     public void open(Configuration parameters) throws IOException {
-        if (connector != null) return;
-        connector = new KuduConnector(kuduMasters, tableInfo, consistency, 
writeMode);
-        serializer.withSchema(tableInfo.getSchema());
+        if (this.connector != null) return;
+        FlushMode flushMode = ((StreamingRuntimeContext) 
getRuntimeContext()).isCheckpointingEnabled() ?
 
 Review comment:
   In my understanding they are not equivalent.
   - ```EVENTUAL``` and ```STRONG ``` mostly means out-of-order or ordered send 
message to Kudu.
   - ```AUTO_FLUSH_BACKGROUND``` and ```AUTO_FLUSH_SYNC``` mostly means batch 
or single send message to Kudu.
   
   ```response.join()``` is Synchronously waits until this Deferred is called 
back.  If this Deferred already completed, this method returns (or throws) 
immediately. Otherwise, the current thread will be blocked and will wait until 
the Deferred is called back.
   the  ```response.join()```  is only useful when you want to keep strong 
consistence In operation interleaving , eg: A opeartion and B opeartion need to 
be executed in order, when we use ```session.apply(operation)```  each 
opeartion will use client to send Rpc Request . in this step, there are sync, 
but maybe B opeartion is execute fast than A in kudu server,and we first got B 
opeartion 's callback. So it don't ensure consistence, only if we use 
```response.join()```  to keep 
    waiting for A opeartion's callback before execute B opeartion.
   But, whether we use ```response.join()``` or not. In default FlushMode 
```session.apply(operation)``` is execute operation one by one , not batch. if 
we want to use buffer to impove throughput ,we can use another FlushMode 
like```AUTO_FLUSH_BACKGROUND```  and using checkpoint to ensure at-least-once.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve KuduSink throughput by using async FlushMode
> ----------------------------------------------------
>
>                 Key: BAHIR-202
>                 URL: https://issues.apache.org/jira/browse/BAHIR-202
>             Project: Bahir
>          Issue Type: Improvement
>          Components: Flink Streaming Connectors
>    Affects Versions: Flink-1.0
>            Reporter: Suxing Lee
>            Priority: Major
>             Fix For: Flink-Next
>
>
> Improve KuduSink throughput by using async FlushMode.
> And using checkpoint to ensure at-least-once in async flush mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to