hawk9821 commented on code in PR #7728:
URL: https://github.com/apache/seatunnel/pull/7728#discussion_r1773011363
##########
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java:
##########
@@ -59,27 +65,85 @@ public PaimonBucketAssigner(Table table, int numAssigners,
int assignId) {
private void loadBucketIndex(FileStoreTable fileStoreTable, int
numAssigners, int assignId) {
IndexBootstrap indexBootstrap = new IndexBootstrap(fileStoreTable);
+ List<String> fieldNames = schema.fieldNames();
+ Map<String, Integer> fieldIndexMap =
+ IntStream.range(0, fieldNames.size())
+ .boxed()
+ .collect(Collectors.toMap(fieldNames::get,
Function.identity()));
+ List<DataField> primaryKeys = schema.primaryKeysFields();
try (RecordReader<InternalRow> recordReader =
indexBootstrap.bootstrap(numAssigners, assignId)) {
RecordReaderIterator<InternalRow> readerIterator =
new RecordReaderIterator<>(recordReader);
while (readerIterator.hasNext()) {
InternalRow row = readerIterator.next();
- assign(row);
+ GenericRow binaryRow = new GenericRow(fieldNames.size());
+ for (int i = 0; i < primaryKeys.size(); i++) {
Review Comment:
Dynamic bucket splitting currently supports only the primary key table, not
the append only table, append only table ' s bucketMode is UNAWARE, dynamic
bucket splitting logic is not executed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]