twalthr commented on a change in pull request #16984:
URL: https://github.com/apache/flink/pull/16984#discussion_r696391827
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
##########
@@ -131,4 +131,19 @@
* @see DecodingFormat#applyReadableMetadata(List)
*/
void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType);
+
+ /**
+ * Defines whether projections can be applied to metadata columns.
+ *
+ * <p>This method is only called if the source does <em>not</em> implement
{@link
+ * SupportsProjectionPushDown}. In this case, by default the planner will
apply all metadata
+ * declared in a table's schema. By returning {@code true} instead the
source can inform the
+ * planner that it should only apply metadata columns which have actually
been selected.
Review comment:
nit: `have actually been selected in the query`
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -52,222 +54,324 @@
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.LinkedList;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createProducedType;
+import static
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createRequiredMetadataKeys;
+import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext;
+import static
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
+
/**
- * Planner rule that pushes a {@link LogicalProject} into a {@link
LogicalTableScan} which wraps a
- * {@link SupportsProjectionPushDown} dynamic table source.
+ * Pushes a {@link LogicalProject} into a {@link LogicalTableScan}.
+ *
+ * <p>If the source implements {@link SupportsProjectionPushDown} this rule
pushes the projection of
+ * physical columns into the source.
+ *
+ * <p>If the source implements {@link SupportsReadingMetadata} this rule also
pushes projected
+ * metadata into the source. For sources implementing {@link
SupportsReadingMetadata} but not {@link
+ * SupportsProjectionPushDown} this is only done if the source indicates that
metadata should be
Review comment:
nit: "if the source indicates that..." can simply be replace with
linking to the method itself here
##########
File path:
flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
##########
@@ -14,4 +14,5 @@
# limitations under the License.
org.apache.flink.table.planner.factories.TestValuesTableFactory
-org.apache.flink.table.planner.factories.TestFileFactory
\ No newline at end of file
+org.apache.flink.table.planner.factories.TestFileFactory
+org.apache.flink.table.planner.plan.rules.logical.PushProjectIntoTableSourceScanRuleTest$MetadataNoProjectionPushDownTableFactory
Review comment:
as mentioned above we should avoid this, once we start with such a test
setup others will copy it as well
##########
File path:
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java
##########
@@ -128,4 +128,19 @@
* @see DecodingFormat#applyReadableMetadata(List)
*/
void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType);
+
+ /**
+ * Defines whether projections can be applied to metadata columns.
+ *
+ * <p>This method is only called if the source does <em>not</em> implement
{@link
+ * SupportsProjectionPushDown}. In this case, by default the planner will
apply all metadata
+ * declared in a table's schema. By returning {@code true} instead the
source can inform the
+ * planner that it should only apply metadata columns which have actually
been selected.
+ *
+ * <p>If the source implements {@link SupportsProjectionPushDown},
projections of metadata
+ * columns are always considered before calling {@link
#applyReadableMetadata(List, DataType)}.
+ */
+ default boolean supportsMetadataProjection() {
+ return false;
Review comment:
+1 for default to `true`, performance should be preferred here. I don't
think it will affect many existing pipelines by users.
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
##########
@@ -243,4 +270,153 @@ public void testProjectFieldAccessWithITEM() {
+ "`outer_map`['item'] "
+ "FROM ItemTable");
}
+
+ @Test
+ public void testMetadataProjectionWithoutProjectionPushDownWhenSupported()
{
+ createMetadataTableWithoutProjectionPushDown("T1", true);
+
+ util().verifyRelPlan("SELECT m1, metadata FROM T1");
+ assertThat(
+
MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.get(),
+ contains("m1", "m2"));
+ }
+
+ @Test
+ public void
testMetadataProjectionWithoutProjectionPushDownWhenNotSupported() {
+ createMetadataTableWithoutProjectionPushDown("T2", false);
+
+ util().verifyRelPlan("SELECT m1, metadata FROM T2");
+ assertThat(
+
MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.get(),
+ contains("m1", "m2", "m3"));
+ }
+
+ @Test
+ public void
testMetadataProjectionWithoutProjectionPushDownWhenSupportedAndNoneSelected() {
+ createMetadataTableWithoutProjectionPushDown("T3", true);
+
+ util().verifyRelPlan("SELECT 1 FROM T3");
+
assertThat(MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.get(),
hasSize(0));
+ }
+
+ @Test
+ public void
testMetadataProjectionWithoutProjectionPushDownWhenNotSupportedAndNoneSelected()
{
+ createMetadataTableWithoutProjectionPushDown("T4", false);
+
+ util().verifyRelPlan("SELECT 1 FROM T4");
+ assertThat(
+
MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.get(),
+ contains("m1", "m2", "m3"));
+ }
+
+ //
---------------------------------------------------------------------------------------------
+
+ private void createMetadataTableWithoutProjectionPushDown(
+ String name, boolean supportsMetadataProjection) {
+ util().tableEnv()
+ .createTable(
+ name,
+ TableDescriptor.forConnector(
+
MetadataNoProjectionPushDownTableFactory.IDENTIFIER)
+ .schema(
+ Schema.newBuilder()
+ .columnByMetadata("m1",
STRING())
+ .columnByMetadata("metadata",
STRING(), "m2")
+ .columnByMetadata("m3",
STRING())
+ .build())
+ .option(SUPPORTS_METADATA_PROJECTION,
supportsMetadataProjection)
+ .build());
+ }
+
+ //
---------------------------------------------------------------------------------------------
+
+ /** Factory for {@link Source}. */
+ public static class MetadataNoProjectionPushDownTableFactory
+ implements DynamicTableSourceFactory {
+ public static final String IDENTIFIER = "metadataNoProjectionPushDown";
+
+ public static final ConfigOption<Boolean> SUPPORTS_METADATA_PROJECTION
=
+
ConfigOptions.key("supports-metadata-projection").booleanType().defaultValue(true);
+
+ public static ThreadLocal<List<String>> appliedMetadataKeys = new
ThreadLocal<>();
Review comment:
we have a special util for extracting static instances `SharedObjects`
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -52,222 +54,324 @@
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.LinkedList;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createProducedType;
+import static
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createRequiredMetadataKeys;
+import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext;
+import static
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
+
/**
- * Planner rule that pushes a {@link LogicalProject} into a {@link
LogicalTableScan} which wraps a
- * {@link SupportsProjectionPushDown} dynamic table source.
+ * Pushes a {@link LogicalProject} into a {@link LogicalTableScan}.
+ *
+ * <p>If the source implements {@link SupportsProjectionPushDown} this rule
pushes the projection of
+ * physical columns into the source.
+ *
+ * <p>If the source implements {@link SupportsReadingMetadata} this rule also
pushes projected
+ * metadata into the source. For sources implementing {@link
SupportsReadingMetadata} but not {@link
+ * SupportsProjectionPushDown} this is only done if the source indicates that
metadata should be
+ * projected. This is important for some sources which would not be re-usable
if different instances
+ * (due to different projected metadata) of the source were used together.
*/
-public class PushProjectIntoTableSourceScanRule extends RelOptRule {
- public static final PushProjectIntoTableSourceScanRule INSTANCE =
- new PushProjectIntoTableSourceScanRule();
-
- public PushProjectIntoTableSourceScanRule() {
- super(
- operand(LogicalProject.class, operand(LogicalTableScan.class,
none())),
- "PushProjectIntoTableSourceScanRule");
+@Internal
+public class PushProjectIntoTableSourceScanRule
+ extends RelRule<PushProjectIntoTableSourceScanRule.Config> {
+
+ public static final RelOptRule INSTANCE =
+ Config.EMPTY.as(Config.class).onProjectedScan().toRule();
+
+ public PushProjectIntoTableSourceScanRule(Config config) {
+ super(config);
}
@Override
public boolean matches(RelOptRuleCall call) {
- LogicalTableScan scan = call.rel(1);
- TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
- if (tableSourceTable == null
- || !(tableSourceTable.tableSource() instanceof
SupportsProjectionPushDown)) {
+ final LogicalTableScan scan = call.rel(1);
+ final TableSourceTable sourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ if (sourceTable == null) {
return false;
}
- return Arrays.stream(tableSourceTable.abilitySpecs())
- .noneMatch(spec -> spec instanceof ProjectPushDownSpec);
+
+ // The source supports projection push-down.
+ if (supportsProjectionPushDown(sourceTable.tableSource())) {
+ return Arrays.stream(sourceTable.abilitySpecs())
+ .noneMatch(spec -> spec instanceof ProjectPushDownSpec);
+ }
+
+ // The source supports metadata and wants them to be projected even if
projection push-down
+ // (for physical columns) is not supported.
+ if (supportsMetadata(sourceTable.tableSource())) {
+ if (Arrays.stream(sourceTable.abilitySpecs())
+ .anyMatch(spec -> spec instanceof ReadingMetadataSpec)) {
+ return false;
+ }
+
+ return ((SupportsReadingMetadata) sourceTable.tableSource())
+ .supportsMetadataProjection();
+ }
+
+ return false;
}
@Override
public void onMatch(RelOptRuleCall call) {
final LogicalProject project = call.rel(0);
final LogicalTableScan scan = call.rel(1);
+ final TableSourceTable source =
scan.getTable().unwrap(TableSourceTable.class);
Review comment:
nit: above you called it `sourceTable`
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
##########
@@ -234,7 +234,7 @@ private static RowKind parseRowKind(String
rowKindShortString) {
public static final AtomicInteger RESOURCE_COUNTER = new AtomicInteger();
- private static final String IDENTIFIER = "values";
+ public static final String IDENTIFIER = "values";
Review comment:
btw if you find some time we could also update this class to the new
`TestValuesOptions` class structure to reference options programmatically
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -52,222 +54,324 @@
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.LinkedList;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createProducedType;
+import static
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createRequiredMetadataKeys;
+import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext;
+import static
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
+
/**
- * Planner rule that pushes a {@link LogicalProject} into a {@link
LogicalTableScan} which wraps a
- * {@link SupportsProjectionPushDown} dynamic table source.
+ * Pushes a {@link LogicalProject} into a {@link LogicalTableScan}.
+ *
+ * <p>If the source implements {@link SupportsProjectionPushDown} this rule
pushes the projection of
+ * physical columns into the source.
+ *
+ * <p>If the source implements {@link SupportsReadingMetadata} this rule also
pushes projected
+ * metadata into the source. For sources implementing {@link
SupportsReadingMetadata} but not {@link
+ * SupportsProjectionPushDown} this is only done if the source indicates that
metadata should be
+ * projected. This is important for some sources which would not be re-usable
if different instances
+ * (due to different projected metadata) of the source were used together.
*/
-public class PushProjectIntoTableSourceScanRule extends RelOptRule {
- public static final PushProjectIntoTableSourceScanRule INSTANCE =
- new PushProjectIntoTableSourceScanRule();
-
- public PushProjectIntoTableSourceScanRule() {
- super(
- operand(LogicalProject.class, operand(LogicalTableScan.class,
none())),
- "PushProjectIntoTableSourceScanRule");
+@Internal
+public class PushProjectIntoTableSourceScanRule
+ extends RelRule<PushProjectIntoTableSourceScanRule.Config> {
+
+ public static final RelOptRule INSTANCE =
+ Config.EMPTY.as(Config.class).onProjectedScan().toRule();
+
+ public PushProjectIntoTableSourceScanRule(Config config) {
+ super(config);
}
@Override
public boolean matches(RelOptRuleCall call) {
- LogicalTableScan scan = call.rel(1);
- TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
- if (tableSourceTable == null
- || !(tableSourceTable.tableSource() instanceof
SupportsProjectionPushDown)) {
+ final LogicalTableScan scan = call.rel(1);
+ final TableSourceTable sourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ if (sourceTable == null) {
return false;
}
- return Arrays.stream(tableSourceTable.abilitySpecs())
- .noneMatch(spec -> spec instanceof ProjectPushDownSpec);
+
+ // The source supports projection push-down.
+ if (supportsProjectionPushDown(sourceTable.tableSource())) {
Review comment:
nit: introduce a variable `source` for replacing all
`sourceTable.tableSource()`
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
##########
@@ -243,4 +270,153 @@ public void testProjectFieldAccessWithITEM() {
+ "`outer_map`['item'] "
+ "FROM ItemTable");
}
+
+ @Test
+ public void testMetadataProjectionWithoutProjectionPushDownWhenSupported()
{
+ createMetadataTableWithoutProjectionPushDown("T1", true);
+
+ util().verifyRelPlan("SELECT m1, metadata FROM T1");
+ assertThat(
+
MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.get(),
+ contains("m1", "m2"));
+ }
+
+ @Test
+ public void
testMetadataProjectionWithoutProjectionPushDownWhenNotSupported() {
+ createMetadataTableWithoutProjectionPushDown("T2", false);
+
+ util().verifyRelPlan("SELECT m1, metadata FROM T2");
+ assertThat(
+
MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.get(),
+ contains("m1", "m2", "m3"));
+ }
+
+ @Test
+ public void
testMetadataProjectionWithoutProjectionPushDownWhenSupportedAndNoneSelected() {
+ createMetadataTableWithoutProjectionPushDown("T3", true);
+
+ util().verifyRelPlan("SELECT 1 FROM T3");
+
assertThat(MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.get(),
hasSize(0));
+ }
+
+ @Test
+ public void
testMetadataProjectionWithoutProjectionPushDownWhenNotSupportedAndNoneSelected()
{
+ createMetadataTableWithoutProjectionPushDown("T4", false);
+
+ util().verifyRelPlan("SELECT 1 FROM T4");
+ assertThat(
+
MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.get(),
+ contains("m1", "m2", "m3"));
+ }
+
+ //
---------------------------------------------------------------------------------------------
+
+ private void createMetadataTableWithoutProjectionPushDown(
+ String name, boolean supportsMetadataProjection) {
+ util().tableEnv()
+ .createTable(
+ name,
+ TableDescriptor.forConnector(
+
MetadataNoProjectionPushDownTableFactory.IDENTIFIER)
+ .schema(
+ Schema.newBuilder()
+ .columnByMetadata("m1",
STRING())
+ .columnByMetadata("metadata",
STRING(), "m2")
+ .columnByMetadata("m3",
STRING())
+ .build())
+ .option(SUPPORTS_METADATA_PROJECTION,
supportsMetadataProjection)
+ .build());
+ }
+
+ //
---------------------------------------------------------------------------------------------
+
+ /** Factory for {@link Source}. */
+ public static class MetadataNoProjectionPushDownTableFactory
Review comment:
How about we create a new kind of factory that is more multipurpose? It
does not implement a runtime provider and the implementing interfaces can be
selected via options. We can simply code gen that and with `SharedObjects` we
basically store the complete signature whenever methods are called by the
planner. Let's do this either now or immediately after this PR. We should
provide better infrastructure to make our tests more light weight.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -52,222 +54,324 @@
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.LinkedList;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createProducedType;
+import static
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createRequiredMetadataKeys;
+import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext;
+import static
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
+
/**
- * Planner rule that pushes a {@link LogicalProject} into a {@link
LogicalTableScan} which wraps a
- * {@link SupportsProjectionPushDown} dynamic table source.
+ * Pushes a {@link LogicalProject} into a {@link LogicalTableScan}.
+ *
+ * <p>If the source implements {@link SupportsProjectionPushDown} this rule
pushes the projection of
+ * physical columns into the source.
+ *
+ * <p>If the source implements {@link SupportsReadingMetadata} this rule also
pushes projected
+ * metadata into the source. For sources implementing {@link
SupportsReadingMetadata} but not {@link
+ * SupportsProjectionPushDown} this is only done if the source indicates that
metadata should be
+ * projected. This is important for some sources which would not be re-usable
if different instances
+ * (due to different projected metadata) of the source were used together.
*/
-public class PushProjectIntoTableSourceScanRule extends RelOptRule {
- public static final PushProjectIntoTableSourceScanRule INSTANCE =
- new PushProjectIntoTableSourceScanRule();
-
- public PushProjectIntoTableSourceScanRule() {
- super(
- operand(LogicalProject.class, operand(LogicalTableScan.class,
none())),
- "PushProjectIntoTableSourceScanRule");
+@Internal
+public class PushProjectIntoTableSourceScanRule
+ extends RelRule<PushProjectIntoTableSourceScanRule.Config> {
+
+ public static final RelOptRule INSTANCE =
+ Config.EMPTY.as(Config.class).onProjectedScan().toRule();
+
+ public PushProjectIntoTableSourceScanRule(Config config) {
+ super(config);
}
@Override
public boolean matches(RelOptRuleCall call) {
- LogicalTableScan scan = call.rel(1);
- TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
- if (tableSourceTable == null
- || !(tableSourceTable.tableSource() instanceof
SupportsProjectionPushDown)) {
+ final LogicalTableScan scan = call.rel(1);
+ final TableSourceTable sourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ if (sourceTable == null) {
return false;
}
- return Arrays.stream(tableSourceTable.abilitySpecs())
- .noneMatch(spec -> spec instanceof ProjectPushDownSpec);
+
+ // The source supports projection push-down.
+ if (supportsProjectionPushDown(sourceTable.tableSource())) {
+ return Arrays.stream(sourceTable.abilitySpecs())
+ .noneMatch(spec -> spec instanceof ProjectPushDownSpec);
+ }
+
+ // The source supports metadata and wants them to be projected even if
projection push-down
+ // (for physical columns) is not supported.
+ if (supportsMetadata(sourceTable.tableSource())) {
+ if (Arrays.stream(sourceTable.abilitySpecs())
+ .anyMatch(spec -> spec instanceof ReadingMetadataSpec)) {
+ return false;
+ }
+
+ return ((SupportsReadingMetadata) sourceTable.tableSource())
+ .supportsMetadataProjection();
+ }
+
+ return false;
}
@Override
public void onMatch(RelOptRuleCall call) {
final LogicalProject project = call.rel(0);
final LogicalTableScan scan = call.rel(1);
+ final TableSourceTable source =
scan.getTable().unwrap(TableSourceTable.class);
+
+ final boolean supportsNestedProjection =
supportsNestedProjection(source.tableSource());
final int[] refFields =
RexNodeExtractor.extractRefInputFields(project.getProjects());
- TableSourceTable oldTableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
- final ResolvedSchema oldSchema =
oldTableSourceTable.catalogTable().getResolvedSchema();
- final DynamicTableSource oldSource = oldTableSourceTable.tableSource();
- final TableConfig config =
ShortcutUtils.unwrapContext(scan).getTableConfig();
-
- final boolean supportsNestedProjection =
- ((SupportsProjectionPushDown)
oldTableSourceTable.tableSource())
- .supportsNestedProjection();
- List<String> fieldNames = scan.getRowType().getFieldNames();
-
- if (!supportsNestedProjection && refFields.length ==
fieldNames.size()) {
- // just keep as same as the old plan
- // TODO: refactor the affected plan
+ if (!supportsNestedProjection && refFields.length ==
scan.getRowType().getFieldCount()) {
+ // There is no top-level projection and nested projections aren't
supported.
return;
}
- List<RexNode> oldProjectsWithPK = new
ArrayList<>(project.getProjects());
- FlinkTypeFactory flinkTypeFactory =
ShortcutUtils.unwrapTypeFactory(scan);
- if (isPrimaryKeyFieldsRequired(oldTableSourceTable, config)) {
- // add pk into projects for upsert source
- oldSchema
- .getPrimaryKey()
- .ifPresent(
- pks -> {
- for (String name : pks.getColumns()) {
- int index = fieldNames.indexOf(name);
- Column col =
oldSchema.getColumn(index).get();
- oldProjectsWithPK.add(
- new RexInputRef(
- index,
-
flinkTypeFactory.createFieldTypeFromLogicalType(
-
col.getDataType().getLogicalType())));
- }
- });
- }
- // build used schema tree
- RowType originType = DynamicSourceUtils.createProducedType(oldSchema,
oldSource);
- NestedSchema nestedSchema =
+ final FlinkTypeFactory typeFactory = unwrapTypeFactory(scan);
+ final ResolvedSchema schema =
source.catalogTable().getResolvedSchema();
+ final RowType producedType = createProducedType(schema,
source.tableSource());
+ final NestedSchema projectedSchema =
NestedProjectionUtil.build(
- oldProjectsWithPK,
flinkTypeFactory.buildRelNodeRowType(originType));
+ getProjections(project, scan),
+ typeFactory.buildRelNodeRowType(producedType));
if (!supportsNestedProjection) {
- // mark the fields in the top level as leaf
- for (NestedColumn column : nestedSchema.columns().values()) {
+ for (NestedColumn column : projectedSchema.columns().values()) {
column.markLeaf();
}
}
- DataType producedDataType =
TypeConversions.fromLogicalToDataType(originType);
-
- List<SourceAbilitySpec> sourceAbilitySpecs = new ArrayList<>();
- RowType newProducedType;
- if (oldSource instanceof SupportsReadingMetadata) {
- List<String> metadataKeys =
- DynamicSourceUtils.createRequiredMetadataKeys(oldSchema,
oldSource);
- newProducedType =
- applyPhysicalAndMetadataPushDown(
- nestedSchema, metadataKeys, originType,
sourceAbilitySpecs);
- } else {
- int[][] projectedFields =
NestedProjectionUtil.convertToIndexArray(nestedSchema);
- newProducedType =
- (RowType)
- DataTypeUtils.projectRow(producedDataType,
projectedFields)
- .getLogicalType();
- sourceAbilitySpecs.add(new ProjectPushDownSpec(projectedFields,
newProducedType));
- }
- DynamicTableSource newSource = oldSource.copy();
- SourceAbilityContext context = SourceAbilityContext.from(scan);
- for (SourceAbilitySpec abilitySpec : sourceAbilitySpecs) {
- abilitySpec.apply(newSource, context);
- }
+ final List<SourceAbilitySpec> abilitySpecs = new ArrayList<>();
+ final RowType newProducedType =
+ performPushDown(source, projectedSchema, producedType,
abilitySpecs);
- RelDataType newRowType =
flinkTypeFactory.buildRelNodeRowType(newProducedType);
+ final DynamicTableSource newTableSource = source.tableSource().copy();
+ final SourceAbilityContext context = SourceAbilityContext.from(scan);
+ abilitySpecs.forEach(spec -> spec.apply(newTableSource, context));
- // project push down does not change the statistic, we can reuse
origin statistic
- TableSourceTable newTableSourceTable =
- oldTableSourceTable.copy(
- newSource,
+ final RelDataType newRowType =
typeFactory.buildRelNodeRowType(newProducedType);
+ final TableSourceTable newSource =
+ source.copy(
+ newTableSource,
newRowType,
- getExtraDigests(newRowType, sourceAbilitySpecs),
- sourceAbilitySpecs.toArray(new SourceAbilitySpec[0]));
- LogicalTableScan newScan =
+ getExtraDigests(abilitySpecs),
+ abilitySpecs.toArray(new SourceAbilitySpec[0]));
+ final LogicalTableScan newScan =
new LogicalTableScan(
- scan.getCluster(),
- scan.getTraitSet(),
- scan.getHints(),
- newTableSourceTable);
- // rewrite the input field in projections
- // the origin projections are enough. Because the upsert source only
uses pk info
- // normalization node.
- List<RexNode> newProjects =
- NestedProjectionUtil.rewrite(
- project.getProjects(), nestedSchema,
call.builder().getRexBuilder());
- // rewrite new source
- LogicalProject newProject =
- project.copy(project.getTraitSet(), newScan, newProjects,
project.getRowType());
+ scan.getCluster(), scan.getTraitSet(),
scan.getHints(), newSource);
+ final LogicalProject newProject =
+ project.copy(
+ project.getTraitSet(),
+ newScan,
+ rewriteProjections(call, newSource, projectedSchema),
+ project.getRowType());
if (ProjectRemoveRule.isTrivial(newProject)) {
- // drop project if the transformed program merely returns its input
call.transformTo(newScan);
} else {
call.transformTo(newProject);
}
}
- private static String[] getExtraDigests(
- RelDataType rowType, List<SourceAbilitySpec> abilitySpecs) {
- final List<String> digests = new ArrayList<>();
- digests.add(String.format("project=[%s]", String.join(", ",
rowType.getFieldNames())));
+ private boolean supportsProjectionPushDown(DynamicTableSource tableSource)
{
+ return tableSource instanceof SupportsProjectionPushDown;
+ }
- for (SourceAbilitySpec abilitySpec : abilitySpecs) {
- if (abilitySpec instanceof ReadingMetadataSpec) {
- final ReadingMetadataSpec metadataSpec = (ReadingMetadataSpec)
abilitySpec;
- digests.add(
- String.format(
- "metadata=[%s]",
- String.join(", ",
metadataSpec.getMetadataKeys())));
- }
+ private boolean supportsMetadata(DynamicTableSource tableSource) {
+ return tableSource instanceof SupportsReadingMetadata;
+ }
+
+ private boolean supportsNestedProjection(DynamicTableSource tableSource) {
+ return supportsProjectionPushDown(tableSource)
+ && ((SupportsProjectionPushDown)
tableSource).supportsNestedProjection();
+ }
+
+ private List<RexNode> getProjections(LogicalProject project,
LogicalTableScan scan) {
+ final TableSourceTable source =
scan.getTable().unwrap(TableSourceTable.class);
+ final TableConfig tableConfig = unwrapContext(scan).getTableConfig();
+
+ final List<RexNode> projections = new
ArrayList<>(project.getProjects());
+ if (supportsProjectionPushDown(source.tableSource())
+ && requiresPrimaryKey(source, tableConfig)) {
+ projections.addAll(getPrimaryKeyProjections(scan));
}
- return digests.toArray(new String[0]);
+ return projections;
}
- /** Returns true if the primary key is required and should be retained. */
- private static boolean isPrimaryKeyFieldsRequired(TableSourceTable table,
TableConfig config) {
+ private static boolean requiresPrimaryKey(TableSourceTable table,
TableConfig config) {
return DynamicSourceUtils.isUpsertSource(table.catalogTable(),
table.tableSource())
|| DynamicSourceUtils.isSourceChangeEventsDuplicate(
table.catalogTable(), table.tableSource(), config);
}
- /**
- * Push the used physical column and metadata into table source. The
returned value is used to
- * build new table schema.
- */
- private static RowType applyPhysicalAndMetadataPushDown(
- NestedSchema nestedSchema,
- List<String> metadataKeys,
- RowType originType,
- List<SourceAbilitySpec> sourceAbilitySpecs) {
- // TODO: supports nested projection for metadata
- List<NestedColumn> usedMetaDataFields = new LinkedList<>();
- int physicalCount = originType.getFieldCount() - metadataKeys.size();
- List<String> fieldNames = originType.getFieldNames();
-
- // rm metadata in the tree
- for (int i = 0; i < metadataKeys.size(); i++) {
- NestedColumn usedMetadata =
- nestedSchema.columns().remove(fieldNames.get(i +
physicalCount));
- if (usedMetadata != null) {
- usedMetaDataFields.add(usedMetadata);
- }
+ private List<RexNode> getPrimaryKeyProjections(LogicalTableScan scan) {
+ final TableSourceTable source =
scan.getTable().unwrap(TableSourceTable.class);
+ final ResolvedSchema schema =
source.catalogTable().getResolvedSchema();
+ if (!schema.getPrimaryKey().isPresent()) {
+ return Collections.emptyList();
}
- // get path of the used fields
- int[][] projectedPhysicalFields =
NestedProjectionUtil.convertToIndexArray(nestedSchema);
+ final FlinkTypeFactory typeFactory = unwrapTypeFactory(scan);
+ final UniqueConstraint primaryKey = schema.getPrimaryKey().get();
+ return primaryKey.getColumns().stream()
+ .map(
+ columnName -> {
+ final int idx =
scan.getRowType().getFieldNames().indexOf(columnName);
+ final Column column =
+ schema.getColumn(idx)
+ .orElseThrow(
+ () ->
+ new TableException(
+
String.format(
+
"Column at index %d not found.",
+
idx)));
+ return new RexInputRef(
+ idx,
+ typeFactory.createFieldTypeFromLogicalType(
+
column.getDataType().getLogicalType()));
+ })
+ .collect(Collectors.toList());
+ }
+
+ private RowType performPushDown(
+ TableSourceTable source,
+ NestedSchema projectedSchema,
+ RowType producedType,
+ List<SourceAbilitySpec> abilitySpecs) {
+ final int numPhysicalColumns;
+ final List<NestedColumn> projectedMetadataColumns;
+ if (supportsMetadata(source.tableSource())) {
+ final List<String> declaredMetadataKeys =
+ createRequiredMetadataKeys(
+ source.catalogTable().getResolvedSchema(),
source.tableSource());
- // push the metadata back for later rewrite and extract the location
in the origin row
- int newIndex = projectedPhysicalFields.length;
- List<String> usedMetadataNames = new LinkedList<>();
- for (NestedColumn metadata : usedMetaDataFields) {
- metadata.setIndexOfLeafInNewSchema(newIndex++);
- nestedSchema.columns().put(metadata.name(), metadata);
-
usedMetadataNames.add(metadataKeys.get(metadata.indexInOriginSchema() -
physicalCount));
+ numPhysicalColumns = producedType.getFieldCount() -
declaredMetadataKeys.size();
+
+ projectedMetadataColumns =
+ IntStream.range(0, declaredMetadataKeys.size())
+ .mapToObj(i ->
producedType.getFieldNames().get(numPhysicalColumns + i))
+ .map(fieldName ->
projectedSchema.columns().get(fieldName))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ } else {
+ numPhysicalColumns = producedType.getFieldCount();
+ projectedMetadataColumns = Collections.emptyList();
}
- // apply metadata push down
- int[][] projectedFields =
+ final int[][] physicalProjections;
+ if (supportsProjectionPushDown(source.tableSource())) {
+ projectedMetadataColumns.forEach(
+ metaColumn ->
projectedSchema.columns().remove(metaColumn.name()));
+
+ physicalProjections =
NestedProjectionUtil.convertToIndexArray(projectedSchema);
+
+ projectedMetadataColumns.forEach(
+ metaColumn ->
projectedSchema.columns().put(metaColumn.name(), metaColumn));
+ } else {
+ physicalProjections =
+ IntStream.range(0, numPhysicalColumns)
+ .mapToObj(columnIndex -> new int[] {columnIndex})
+ .toArray(int[][]::new);
+ }
+
+ final int[][] projectedFields =
Stream.concat(
- Stream.of(projectedPhysicalFields),
- usedMetaDataFields.stream()
- .map(field -> new int[]
{field.indexInOriginSchema()}))
+ Stream.of(physicalProjections),
+ projectedMetadataColumns.stream()
+ .map(NestedColumn::indexInOriginSchema)
+ .map(columnIndex -> new int[]
{columnIndex}))
.toArray(int[][]::new);
- RowType newProducedType =
+
+ int newIndex = physicalProjections.length;
+ for (NestedColumn metaColumn : projectedMetadataColumns) {
+ metaColumn.setIndexOfLeafInNewSchema(newIndex++);
+ }
+
+ final RowType newProducedType =
(RowType)
DataTypeUtils.projectRow(
-
TypeConversions.fromLogicalToDataType(originType),
+
TypeConversions.fromLogicalToDataType(producedType),
projectedFields)
.getLogicalType();
- sourceAbilitySpecs.add(new
ProjectPushDownSpec(projectedPhysicalFields, newProducedType));
- sourceAbilitySpecs.add(new ReadingMetadataSpec(usedMetadataNames,
newProducedType));
+ if (supportsProjectionPushDown(source.tableSource())) {
Review comment:
we can also save the result of calling this methods in variables early
##########
File path:
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
##########
@@ -243,4 +270,153 @@ public void testProjectFieldAccessWithITEM() {
+ "`outer_map`['item'] "
+ "FROM ItemTable");
}
+
+ @Test
+ public void testMetadataProjectionWithoutProjectionPushDownWhenSupported()
{
+ createMetadataTableWithoutProjectionPushDown("T1", true);
+
+ util().verifyRelPlan("SELECT m1, metadata FROM T1");
+ assertThat(
+
MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.get(),
+ contains("m1", "m2"));
+ }
+
+ @Test
+ public void
testMetadataProjectionWithoutProjectionPushDownWhenNotSupported() {
+ createMetadataTableWithoutProjectionPushDown("T2", false);
+
+ util().verifyRelPlan("SELECT m1, metadata FROM T2");
+ assertThat(
+
MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.get(),
+ contains("m1", "m2", "m3"));
+ }
+
+ @Test
+ public void
testMetadataProjectionWithoutProjectionPushDownWhenSupportedAndNoneSelected() {
+ createMetadataTableWithoutProjectionPushDown("T3", true);
+
+ util().verifyRelPlan("SELECT 1 FROM T3");
+
assertThat(MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.get(),
hasSize(0));
+ }
+
+ @Test
+ public void
testMetadataProjectionWithoutProjectionPushDownWhenNotSupportedAndNoneSelected()
{
+ createMetadataTableWithoutProjectionPushDown("T4", false);
+
+ util().verifyRelPlan("SELECT 1 FROM T4");
+ assertThat(
+
MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.get(),
+ contains("m1", "m2", "m3"));
+ }
+
+ //
---------------------------------------------------------------------------------------------
+
+ private void createMetadataTableWithoutProjectionPushDown(
+ String name, boolean supportsMetadataProjection) {
+ util().tableEnv()
+ .createTable(
+ name,
+ TableDescriptor.forConnector(
+
MetadataNoProjectionPushDownTableFactory.IDENTIFIER)
+ .schema(
+ Schema.newBuilder()
+ .columnByMetadata("m1",
STRING())
+ .columnByMetadata("metadata",
STRING(), "m2")
+ .columnByMetadata("m3",
STRING())
+ .build())
+ .option(SUPPORTS_METADATA_PROJECTION,
supportsMetadataProjection)
+ .build());
+ }
+
+ //
---------------------------------------------------------------------------------------------
+
+ /** Factory for {@link Source}. */
+ public static class MetadataNoProjectionPushDownTableFactory
+ implements DynamicTableSourceFactory {
+ public static final String IDENTIFIER = "metadataNoProjectionPushDown";
+
+ public static final ConfigOption<Boolean> SUPPORTS_METADATA_PROJECTION
=
+
ConfigOptions.key("supports-metadata-projection").booleanType().defaultValue(true);
+
+ public static ThreadLocal<List<String>> appliedMetadataKeys = new
ThreadLocal<>();
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Collections.singleton(SUPPORTS_METADATA_PROJECTION);
+ }
+
+ @Override
+ public DynamicTableSource createDynamicTableSource(Context context) {
+ FactoryUtil.TableFactoryHelper helper =
+ FactoryUtil.createTableFactoryHelper(this, context);
+ return new Source(helper.getOptions());
+ }
+ }
+
+ private static class Source implements ScanTableSource,
SupportsReadingMetadata {
+
+ private final ReadableConfig options;
+
+ public Source(ReadableConfig options) {
+ this.options = options;
+
MetadataNoProjectionPushDownTableFactory.appliedMetadataKeys.remove();
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+
+ @Override
+ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext
runtimeProviderContext) {
+ return SourceFunctionProvider.of(
Review comment:
shouldn't throwing a `UnsupportedOperationException` be enough here?
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRule.java
##########
@@ -52,222 +54,324 @@
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.LinkedList;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createProducedType;
+import static
org.apache.flink.table.planner.connectors.DynamicSourceUtils.createRequiredMetadataKeys;
+import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext;
+import static
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory;
+
/**
- * Planner rule that pushes a {@link LogicalProject} into a {@link
LogicalTableScan} which wraps a
- * {@link SupportsProjectionPushDown} dynamic table source.
+ * Pushes a {@link LogicalProject} into a {@link LogicalTableScan}.
+ *
+ * <p>If the source implements {@link SupportsProjectionPushDown} this rule
pushes the projection of
+ * physical columns into the source.
+ *
+ * <p>If the source implements {@link SupportsReadingMetadata} this rule also
pushes projected
+ * metadata into the source. For sources implementing {@link
SupportsReadingMetadata} but not {@link
+ * SupportsProjectionPushDown} this is only done if the source indicates that
metadata should be
+ * projected. This is important for some sources which would not be re-usable
if different instances
+ * (due to different projected metadata) of the source were used together.
*/
-public class PushProjectIntoTableSourceScanRule extends RelOptRule {
- public static final PushProjectIntoTableSourceScanRule INSTANCE =
- new PushProjectIntoTableSourceScanRule();
-
- public PushProjectIntoTableSourceScanRule() {
- super(
- operand(LogicalProject.class, operand(LogicalTableScan.class,
none())),
- "PushProjectIntoTableSourceScanRule");
+@Internal
+public class PushProjectIntoTableSourceScanRule
+ extends RelRule<PushProjectIntoTableSourceScanRule.Config> {
+
+ public static final RelOptRule INSTANCE =
+ Config.EMPTY.as(Config.class).onProjectedScan().toRule();
+
+ public PushProjectIntoTableSourceScanRule(Config config) {
+ super(config);
}
@Override
public boolean matches(RelOptRuleCall call) {
- LogicalTableScan scan = call.rel(1);
- TableSourceTable tableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
- if (tableSourceTable == null
- || !(tableSourceTable.tableSource() instanceof
SupportsProjectionPushDown)) {
+ final LogicalTableScan scan = call.rel(1);
+ final TableSourceTable sourceTable =
scan.getTable().unwrap(TableSourceTable.class);
+ if (sourceTable == null) {
return false;
}
- return Arrays.stream(tableSourceTable.abilitySpecs())
- .noneMatch(spec -> spec instanceof ProjectPushDownSpec);
+
+ // The source supports projection push-down.
+ if (supportsProjectionPushDown(sourceTable.tableSource())) {
+ return Arrays.stream(sourceTable.abilitySpecs())
+ .noneMatch(spec -> spec instanceof ProjectPushDownSpec);
+ }
+
+ // The source supports metadata and wants them to be projected even if
projection push-down
+ // (for physical columns) is not supported.
+ if (supportsMetadata(sourceTable.tableSource())) {
+ if (Arrays.stream(sourceTable.abilitySpecs())
+ .anyMatch(spec -> spec instanceof ReadingMetadataSpec)) {
+ return false;
+ }
+
+ return ((SupportsReadingMetadata) sourceTable.tableSource())
+ .supportsMetadataProjection();
+ }
+
+ return false;
}
@Override
public void onMatch(RelOptRuleCall call) {
final LogicalProject project = call.rel(0);
final LogicalTableScan scan = call.rel(1);
+ final TableSourceTable source =
scan.getTable().unwrap(TableSourceTable.class);
+
+ final boolean supportsNestedProjection =
supportsNestedProjection(source.tableSource());
final int[] refFields =
RexNodeExtractor.extractRefInputFields(project.getProjects());
- TableSourceTable oldTableSourceTable =
scan.getTable().unwrap(TableSourceTable.class);
- final ResolvedSchema oldSchema =
oldTableSourceTable.catalogTable().getResolvedSchema();
- final DynamicTableSource oldSource = oldTableSourceTable.tableSource();
- final TableConfig config =
ShortcutUtils.unwrapContext(scan).getTableConfig();
-
- final boolean supportsNestedProjection =
- ((SupportsProjectionPushDown)
oldTableSourceTable.tableSource())
- .supportsNestedProjection();
- List<String> fieldNames = scan.getRowType().getFieldNames();
-
- if (!supportsNestedProjection && refFields.length ==
fieldNames.size()) {
- // just keep as same as the old plan
- // TODO: refactor the affected plan
+ if (!supportsNestedProjection && refFields.length ==
scan.getRowType().getFieldCount()) {
+ // There is no top-level projection and nested projections aren't
supported.
return;
}
- List<RexNode> oldProjectsWithPK = new
ArrayList<>(project.getProjects());
- FlinkTypeFactory flinkTypeFactory =
ShortcutUtils.unwrapTypeFactory(scan);
- if (isPrimaryKeyFieldsRequired(oldTableSourceTable, config)) {
- // add pk into projects for upsert source
- oldSchema
- .getPrimaryKey()
- .ifPresent(
- pks -> {
- for (String name : pks.getColumns()) {
- int index = fieldNames.indexOf(name);
- Column col =
oldSchema.getColumn(index).get();
- oldProjectsWithPK.add(
- new RexInputRef(
- index,
-
flinkTypeFactory.createFieldTypeFromLogicalType(
-
col.getDataType().getLogicalType())));
- }
- });
- }
- // build used schema tree
- RowType originType = DynamicSourceUtils.createProducedType(oldSchema,
oldSource);
- NestedSchema nestedSchema =
+ final FlinkTypeFactory typeFactory = unwrapTypeFactory(scan);
+ final ResolvedSchema schema =
source.catalogTable().getResolvedSchema();
+ final RowType producedType = createProducedType(schema,
source.tableSource());
+ final NestedSchema projectedSchema =
NestedProjectionUtil.build(
- oldProjectsWithPK,
flinkTypeFactory.buildRelNodeRowType(originType));
+ getProjections(project, scan),
+ typeFactory.buildRelNodeRowType(producedType));
if (!supportsNestedProjection) {
- // mark the fields in the top level as leaf
- for (NestedColumn column : nestedSchema.columns().values()) {
+ for (NestedColumn column : projectedSchema.columns().values()) {
column.markLeaf();
}
}
- DataType producedDataType =
TypeConversions.fromLogicalToDataType(originType);
-
- List<SourceAbilitySpec> sourceAbilitySpecs = new ArrayList<>();
- RowType newProducedType;
- if (oldSource instanceof SupportsReadingMetadata) {
- List<String> metadataKeys =
- DynamicSourceUtils.createRequiredMetadataKeys(oldSchema,
oldSource);
- newProducedType =
- applyPhysicalAndMetadataPushDown(
- nestedSchema, metadataKeys, originType,
sourceAbilitySpecs);
- } else {
- int[][] projectedFields =
NestedProjectionUtil.convertToIndexArray(nestedSchema);
- newProducedType =
- (RowType)
- DataTypeUtils.projectRow(producedDataType,
projectedFields)
- .getLogicalType();
- sourceAbilitySpecs.add(new ProjectPushDownSpec(projectedFields,
newProducedType));
- }
- DynamicTableSource newSource = oldSource.copy();
- SourceAbilityContext context = SourceAbilityContext.from(scan);
- for (SourceAbilitySpec abilitySpec : sourceAbilitySpecs) {
- abilitySpec.apply(newSource, context);
- }
+ final List<SourceAbilitySpec> abilitySpecs = new ArrayList<>();
+ final RowType newProducedType =
+ performPushDown(source, projectedSchema, producedType,
abilitySpecs);
- RelDataType newRowType =
flinkTypeFactory.buildRelNodeRowType(newProducedType);
+ final DynamicTableSource newTableSource = source.tableSource().copy();
+ final SourceAbilityContext context = SourceAbilityContext.from(scan);
+ abilitySpecs.forEach(spec -> spec.apply(newTableSource, context));
- // project push down does not change the statistic, we can reuse
origin statistic
- TableSourceTable newTableSourceTable =
- oldTableSourceTable.copy(
- newSource,
+ final RelDataType newRowType =
typeFactory.buildRelNodeRowType(newProducedType);
+ final TableSourceTable newSource =
+ source.copy(
+ newTableSource,
newRowType,
- getExtraDigests(newRowType, sourceAbilitySpecs),
- sourceAbilitySpecs.toArray(new SourceAbilitySpec[0]));
- LogicalTableScan newScan =
+ getExtraDigests(abilitySpecs),
+ abilitySpecs.toArray(new SourceAbilitySpec[0]));
+ final LogicalTableScan newScan =
new LogicalTableScan(
- scan.getCluster(),
- scan.getTraitSet(),
- scan.getHints(),
- newTableSourceTable);
- // rewrite the input field in projections
- // the origin projections are enough. Because the upsert source only
uses pk info
- // normalization node.
- List<RexNode> newProjects =
- NestedProjectionUtil.rewrite(
- project.getProjects(), nestedSchema,
call.builder().getRexBuilder());
- // rewrite new source
- LogicalProject newProject =
- project.copy(project.getTraitSet(), newScan, newProjects,
project.getRowType());
+ scan.getCluster(), scan.getTraitSet(),
scan.getHints(), newSource);
+ final LogicalProject newProject =
+ project.copy(
+ project.getTraitSet(),
+ newScan,
+ rewriteProjections(call, newSource, projectedSchema),
+ project.getRowType());
if (ProjectRemoveRule.isTrivial(newProject)) {
- // drop project if the transformed program merely returns its input
call.transformTo(newScan);
} else {
call.transformTo(newProject);
}
}
- private static String[] getExtraDigests(
- RelDataType rowType, List<SourceAbilitySpec> abilitySpecs) {
- final List<String> digests = new ArrayList<>();
- digests.add(String.format("project=[%s]", String.join(", ",
rowType.getFieldNames())));
+ private boolean supportsProjectionPushDown(DynamicTableSource tableSource)
{
+ return tableSource instanceof SupportsProjectionPushDown;
+ }
- for (SourceAbilitySpec abilitySpec : abilitySpecs) {
- if (abilitySpec instanceof ReadingMetadataSpec) {
- final ReadingMetadataSpec metadataSpec = (ReadingMetadataSpec)
abilitySpec;
- digests.add(
- String.format(
- "metadata=[%s]",
- String.join(", ",
metadataSpec.getMetadataKeys())));
- }
+ private boolean supportsMetadata(DynamicTableSource tableSource) {
+ return tableSource instanceof SupportsReadingMetadata;
+ }
+
+ private boolean supportsNestedProjection(DynamicTableSource tableSource) {
+ return supportsProjectionPushDown(tableSource)
+ && ((SupportsProjectionPushDown)
tableSource).supportsNestedProjection();
+ }
+
+ private List<RexNode> getProjections(LogicalProject project,
LogicalTableScan scan) {
+ final TableSourceTable source =
scan.getTable().unwrap(TableSourceTable.class);
+ final TableConfig tableConfig = unwrapContext(scan).getTableConfig();
+
+ final List<RexNode> projections = new
ArrayList<>(project.getProjects());
+ if (supportsProjectionPushDown(source.tableSource())
+ && requiresPrimaryKey(source, tableConfig)) {
+ projections.addAll(getPrimaryKeyProjections(scan));
}
- return digests.toArray(new String[0]);
+ return projections;
}
- /** Returns true if the primary key is required and should be retained. */
- private static boolean isPrimaryKeyFieldsRequired(TableSourceTable table,
TableConfig config) {
+ private static boolean requiresPrimaryKey(TableSourceTable table,
TableConfig config) {
return DynamicSourceUtils.isUpsertSource(table.catalogTable(),
table.tableSource())
|| DynamicSourceUtils.isSourceChangeEventsDuplicate(
table.catalogTable(), table.tableSource(), config);
}
- /**
- * Push the used physical column and metadata into table source. The
returned value is used to
- * build new table schema.
- */
- private static RowType applyPhysicalAndMetadataPushDown(
- NestedSchema nestedSchema,
- List<String> metadataKeys,
- RowType originType,
- List<SourceAbilitySpec> sourceAbilitySpecs) {
- // TODO: supports nested projection for metadata
- List<NestedColumn> usedMetaDataFields = new LinkedList<>();
- int physicalCount = originType.getFieldCount() - metadataKeys.size();
- List<String> fieldNames = originType.getFieldNames();
-
- // rm metadata in the tree
- for (int i = 0; i < metadataKeys.size(); i++) {
- NestedColumn usedMetadata =
- nestedSchema.columns().remove(fieldNames.get(i +
physicalCount));
- if (usedMetadata != null) {
- usedMetaDataFields.add(usedMetadata);
- }
+ private List<RexNode> getPrimaryKeyProjections(LogicalTableScan scan) {
+ final TableSourceTable source =
scan.getTable().unwrap(TableSourceTable.class);
+ final ResolvedSchema schema =
source.catalogTable().getResolvedSchema();
+ if (!schema.getPrimaryKey().isPresent()) {
+ return Collections.emptyList();
}
- // get path of the used fields
- int[][] projectedPhysicalFields =
NestedProjectionUtil.convertToIndexArray(nestedSchema);
+ final FlinkTypeFactory typeFactory = unwrapTypeFactory(scan);
+ final UniqueConstraint primaryKey = schema.getPrimaryKey().get();
+ return primaryKey.getColumns().stream()
+ .map(
+ columnName -> {
+ final int idx =
scan.getRowType().getFieldNames().indexOf(columnName);
+ final Column column =
+ schema.getColumn(idx)
+ .orElseThrow(
+ () ->
+ new TableException(
+
String.format(
+
"Column at index %d not found.",
+
idx)));
+ return new RexInputRef(
+ idx,
+ typeFactory.createFieldTypeFromLogicalType(
+
column.getDataType().getLogicalType()));
+ })
+ .collect(Collectors.toList());
+ }
+
+ private RowType performPushDown(
+ TableSourceTable source,
+ NestedSchema projectedSchema,
+ RowType producedType,
+ List<SourceAbilitySpec> abilitySpecs) {
+ final int numPhysicalColumns;
+ final List<NestedColumn> projectedMetadataColumns;
+ if (supportsMetadata(source.tableSource())) {
+ final List<String> declaredMetadataKeys =
+ createRequiredMetadataKeys(
+ source.catalogTable().getResolvedSchema(),
source.tableSource());
- // push the metadata back for later rewrite and extract the location
in the origin row
- int newIndex = projectedPhysicalFields.length;
- List<String> usedMetadataNames = new LinkedList<>();
- for (NestedColumn metadata : usedMetaDataFields) {
- metadata.setIndexOfLeafInNewSchema(newIndex++);
- nestedSchema.columns().put(metadata.name(), metadata);
-
usedMetadataNames.add(metadataKeys.get(metadata.indexInOriginSchema() -
physicalCount));
+ numPhysicalColumns = producedType.getFieldCount() -
declaredMetadataKeys.size();
+
+ projectedMetadataColumns =
+ IntStream.range(0, declaredMetadataKeys.size())
+ .mapToObj(i ->
producedType.getFieldNames().get(numPhysicalColumns + i))
+ .map(fieldName ->
projectedSchema.columns().get(fieldName))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+ } else {
+ numPhysicalColumns = producedType.getFieldCount();
+ projectedMetadataColumns = Collections.emptyList();
}
- // apply metadata push down
- int[][] projectedFields =
+ final int[][] physicalProjections;
+ if (supportsProjectionPushDown(source.tableSource())) {
+ projectedMetadataColumns.forEach(
+ metaColumn ->
projectedSchema.columns().remove(metaColumn.name()));
+
+ physicalProjections =
NestedProjectionUtil.convertToIndexArray(projectedSchema);
+
+ projectedMetadataColumns.forEach(
+ metaColumn ->
projectedSchema.columns().put(metaColumn.name(), metaColumn));
+ } else {
+ physicalProjections =
+ IntStream.range(0, numPhysicalColumns)
+ .mapToObj(columnIndex -> new int[] {columnIndex})
+ .toArray(int[][]::new);
+ }
+
+ final int[][] projectedFields =
Stream.concat(
- Stream.of(projectedPhysicalFields),
- usedMetaDataFields.stream()
- .map(field -> new int[]
{field.indexInOriginSchema()}))
+ Stream.of(physicalProjections),
+ projectedMetadataColumns.stream()
+ .map(NestedColumn::indexInOriginSchema)
+ .map(columnIndex -> new int[]
{columnIndex}))
.toArray(int[][]::new);
- RowType newProducedType =
+
+ int newIndex = physicalProjections.length;
+ for (NestedColumn metaColumn : projectedMetadataColumns) {
+ metaColumn.setIndexOfLeafInNewSchema(newIndex++);
+ }
+
+ final RowType newProducedType =
(RowType)
DataTypeUtils.projectRow(
-
TypeConversions.fromLogicalToDataType(originType),
+
TypeConversions.fromLogicalToDataType(producedType),
projectedFields)
.getLogicalType();
- sourceAbilitySpecs.add(new
ProjectPushDownSpec(projectedPhysicalFields, newProducedType));
- sourceAbilitySpecs.add(new ReadingMetadataSpec(usedMetadataNames,
newProducedType));
+ if (supportsProjectionPushDown(source.tableSource())) {
+ abilitySpecs.add(new ProjectPushDownSpec(physicalProjections,
newProducedType));
+ }
+
+ if (supportsMetadata(source.tableSource())) {
+ final List<String> projectedMetadataKeys =
+ projectedMetadataColumns.stream()
+ .map(NestedColumn::name)
+ .collect(Collectors.toList());
+
+ abilitySpecs.add(new ReadingMetadataSpec(projectedMetadataKeys,
newProducedType));
+ }
+
return newProducedType;
}
+
+ private List<RexNode> rewriteProjections(
+ RelOptRuleCall call, TableSourceTable source, NestedSchema
projectedSchema) {
+ final LogicalProject project = call.rel(0);
+ if (supportsProjectionPushDown(source.tableSource())) {
+ return NestedProjectionUtil.rewrite(
+ project.getProjects(), projectedSchema,
call.builder().getRexBuilder());
+ } else {
+ return project.getProjects();
+ }
+ }
+
+ private static String[] getExtraDigests(List<SourceAbilitySpec>
abilitySpecs) {
+ final List<String> digests = new ArrayList<>();
+ for (SourceAbilitySpec abilitySpec : abilitySpecs) {
+ if (abilitySpec instanceof ProjectPushDownSpec) {
+ digests.add(formatPushDownDigest((ProjectPushDownSpec)
abilitySpec));
+ } else if (abilitySpec instanceof ReadingMetadataSpec) {
+ digests.add(formatMetadataDigest((ReadingMetadataSpec)
abilitySpec));
+ }
+ }
+
+ return digests.toArray(new String[0]);
+ }
+
+ private static String formatPushDownDigest(ProjectPushDownSpec
pushDownSpec) {
+ final List<String> fieldNames =
+ pushDownSpec
+ .getProducedType()
+ .orElseThrow(() -> new TableException("Produced data
type is not present."))
+ .getFieldNames();
+
+ return String.format("project=[%s]", String.join(", ", fieldNames));
+ }
+
+ private static String formatMetadataDigest(ReadingMetadataSpec
metadataSpec) {
+ return String.format("metadata=[%s]", String.join(", ",
metadataSpec.getMetadataKeys()));
+ }
+
+ //
---------------------------------------------------------------------------------------------
+
+ /** Configuration for {@link PushProjectIntoTableSourceScanRule}. */
+ public interface Config extends RelRule.Config {
Review comment:
nit: private?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]