MOBIN-F commented on code in PR #3945:
URL: https://github.com/apache/flink-cdc/pull/3945#discussion_r1990457803


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java:
##########
@@ -48,23 +50,37 @@ public class PaimonHashFunction implements 
HashFunction<DataChangeEvent>, Serial
 
     private final RowAssignerChannelComputer channelComputer;
 
+    private final int parallelism;
+
     public PaimonHashFunction(
             Options options, TableId tableId, Schema schema, ZoneId zoneId, 
int parallelism) {
+        this.parallelism = parallelism;
         Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options);
         FileStoreTable table;
         try {
             table = (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId.toString()));
         } catch (Catalog.TableNotExistException e) {
             throw new RuntimeException(e);
         }
-        this.fieldGetters = PaimonWriterHelper.createFieldGetters(schema, 
zoneId);
-        channelComputer = new RowAssignerChannelComputer(table.schema(), 
parallelism);
-        channelComputer.setup(parallelism);
+        if (table instanceof AppendOnlyFileStore) {
+            this.fieldGetters = null;
+            channelComputer = null;
+        } else {
+            this.fieldGetters = PaimonWriterHelper.createFieldGetters(schema, 
zoneId);
+            channelComputer = new RowAssignerChannelComputer(table.schema(), 
parallelism);
+            channelComputer.setup(parallelism);
+        }
     }
 
     @Override
     public int hashcode(DataChangeEvent event) {
-        GenericRow genericRow = 
PaimonWriterHelper.convertEventToGenericRow(event, fieldGetters);
-        return channelComputer.channel(genericRow);
+        if (channelComputer != null) {
+            GenericRow genericRow =
+                    PaimonWriterHelper.convertEventToGenericRow(event, 
fieldGetters);
+            return channelComputer.channel(genericRow);
+        } else {
+            // Avoid sending all events to the same subtask when table has no 
primary key.
+            return RandomUtils.nextInt(0, parallelism);

Review Comment:
   Is it better to use ThreadLocalRandom.current().nextInt(parallelism)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to