tsreaper commented on code in PR #2291:
URL: https://github.com/apache/incubator-paimon/pull/2291#discussion_r1387586217
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java:
##########
@@ -95,4 +117,35 @@ public void close() {
public Snapshot snapshot() {
return snapshot;
}
+
+ @Override
+ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+ if (sourceEvent instanceof DynamicFilteringEvent) {
+ DynamicFilteringData dynamicFilteringData =
+ ((DynamicFilteringEvent) sourceEvent).getData();
+ LOG.warn(
Review Comment:
`LOG.info` is enough.
##########
paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java:
##########
@@ -509,4 +510,8 @@ private RollbackHelper rollbackHelper() {
store().newSnapshotDeletion(),
store().newTagDeletion());
}
+
+ public RowType logicPartitionType() {
+ return tableSchema.logicalPartitionType();
+ }
Review Comment:
We can get `schema()` directly from a `FileStoreTable`. Why introducing this
new method?
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java:
##########
@@ -125,6 +129,22 @@ public FlinkSourceBuilder withWatermarkStrategy(
return this;
}
+ public FlinkSourceBuilder withDynamicPartitionFilteringFields(
+ List<String> dynamicPartitionFilteringFields) {
+ if (dynamicPartitionFilteringFields != null &&
!dynamicPartitionFilteringFields.isEmpty()) {
+ checkState(
+ table instanceof AbstractFileStoreTable,
+ "Only Paimon AbstractFileStoreTable supports dynamic
filtering but get %s.",
+ table.getClass().getName());
+
+ this.dynamicPartitionFilteringInfo =
+ new DynamicPartitionFilteringInfo(
+ ((AbstractFileStoreTable)
table).logicPartitionType(),
+ dynamicPartitionFilteringFields);
Review Comment:
```suggestion
checkState(
table instanceof FileStoreTable,
"Only Paimon FileStoreTable supports dynamic filtering
but get %s.",
table.getClass().getName());
this.dynamicPartitionFilteringInfo =
new DynamicPartitionFilteringInfo(
((FileStoreTable)
table).schema().logicPartitionType(),
dynamicPartitionFilteringFields);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]