twalthr commented on a change in pull request #17768: URL: https://github.com/apache/flink/pull/17768#discussion_r749114621
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.utils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; + +import java.util.Arrays; +import java.util.Objects; + +/** + * An implementation of {@link RowData} which provides a projected view of the underlying {@link Review comment: mention that: - projection includes both reduction of fields as well as reordering - it does not support nested projection, only top-level ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.utils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; + +import java.util.Arrays; +import java.util.Objects; + +/** + * An implementation of {@link RowData} which provides a projected view of the underlying {@link + * RowData}. + */ +@PublicEvolving +public class ProjectedRowData implements RowData { + + private final int[] indexMapping; + + private RowData row; + + private ProjectedRowData(int[] indexMapping) { + this.indexMapping = indexMapping; + } + + /** + * Replaces the underlying {@link RowData} backing this {@link ProjectedRowData}. + * + * <p>This method replaces the row data in place and does not return a new object. This is done + * for performance reasons. + */ + public ProjectedRowData replaceRow(RowData row) { + this.row = row; + return this; + } + + // --------------------------------------------------------------------------------------------- + + @Override + public int getArity() { + return indexMapping.length; + } + + @Override + public RowKind getRowKind() { + return row.getRowKind(); + } + + @Override + public void setRowKind(RowKind kind) { + row.setRowKind(kind); + } + + @Override + public boolean isNullAt(int pos) { + return row.isNullAt(indexMapping[pos]); + } + + @Override + public boolean getBoolean(int pos) { + return row.getBoolean(indexMapping[pos]); + } + + @Override + public byte getByte(int pos) { + return row.getByte(indexMapping[pos]); + } + + @Override + public short getShort(int pos) { + return row.getShort(indexMapping[pos]); + } + + @Override + public int getInt(int pos) { + return row.getInt(indexMapping[pos]); + } + + @Override + public long getLong(int pos) { + return row.getLong(indexMapping[pos]); + } + + @Override + public float getFloat(int pos) { + return row.getFloat(indexMapping[pos]); + } + + @Override + public double getDouble(int pos) { + return row.getDouble(indexMapping[pos]); + } + + @Override + public StringData getString(int pos) { + return row.getString(indexMapping[pos]); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return row.getDecimal(indexMapping[pos], precision, scale); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + return row.getTimestamp(indexMapping[pos], precision); + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + return row.getRawValue(indexMapping[pos]); + } + + @Override + public byte[] getBinary(int pos) { + return row.getBinary(indexMapping[pos]); + } + + @Override + public ArrayData getArray(int pos) { + return row.getArray(indexMapping[pos]); + } + + @Override + public MapData getMap(int pos) { + return row.getMap(indexMapping[pos]); + } + + @Override + public RowData getRow(int pos, int numFields) { + return row.getRow(indexMapping[pos], numFields); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ProjectedRowData that = (ProjectedRowData) o; + return Arrays.equals(indexMapping, that.indexMapping) && Objects.equals(row, that.row); + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(indexMapping), row); + } + + @Override + public String toString() { + return row.getRowKind().shortString() + + "{" + + "indexMapping=" + + Arrays.toString(indexMapping) + + ", mutableRow=" + + row + + '}'; + } + + /** + * Like {@link #from(int[])}, but throws {@link IllegalArgumentException} if the provided {@code + * projection} array contains nested projections, which are not supported by {@link + * ProjectedRowData}. + */ + public static ProjectedRowData from(int[][] projection) { Review comment: explain the argument, sometimes people are confused by index paths. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ProjectableDecodingFormat.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.format; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.projection.Projection; + +/** + * For more details on usage and differences between {@link DecodingFormat} and {@link + * ProjectableDecodingFormat}, check the documentation of {@link DecodingFormat}. + */ +@PublicEvolving +public interface ProjectableDecodingFormat<I> extends DecodingFormat<I> { + + /** Returns whether this format supports nested projection. */ + default boolean supportsNestedProjection() { + return false; + } + + /** + * Creates runtime decoder implementation that is configured to produce data of type {@code + * DataType.projectFields(physicalDataType, projections)}. For more details on the usage, check + * {@link DecodingFormat} documentation. + * + * @param context the context provides several utilities required to instantiate the runtime + * decoder implementation of the format + * @param physicalDataType For more details check {@link DecodingFormat} + * @param projections the projections array. The projections can be nested only if {@link Review comment: explain that this array describes `index paths` that allow arbitrary nesting ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/ProjectedRowData.java ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.data.utils; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RawValueData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.types.RowKind; + +import java.util.Arrays; +import java.util.Objects; + +/** + * An implementation of {@link RowData} which provides a projected view of the underlying {@link + * RowData}. + */ +@PublicEvolving +public class ProjectedRowData implements RowData { + + private final int[] indexMapping; + + private RowData row; + + private ProjectedRowData(int[] indexMapping) { + this.indexMapping = indexMapping; + } + + /** + * Replaces the underlying {@link RowData} backing this {@link ProjectedRowData}. + * + * <p>This method replaces the row data in place and does not return a new object. This is done + * for performance reasons. + */ + public ProjectedRowData replaceRow(RowData row) { + this.row = row; + return this; + } + + // --------------------------------------------------------------------------------------------- + + @Override + public int getArity() { + return indexMapping.length; + } + + @Override + public RowKind getRowKind() { + return row.getRowKind(); + } + + @Override + public void setRowKind(RowKind kind) { + row.setRowKind(kind); + } + + @Override + public boolean isNullAt(int pos) { + return row.isNullAt(indexMapping[pos]); + } + + @Override + public boolean getBoolean(int pos) { + return row.getBoolean(indexMapping[pos]); + } + + @Override + public byte getByte(int pos) { + return row.getByte(indexMapping[pos]); + } + + @Override + public short getShort(int pos) { + return row.getShort(indexMapping[pos]); + } + + @Override + public int getInt(int pos) { + return row.getInt(indexMapping[pos]); + } + + @Override + public long getLong(int pos) { + return row.getLong(indexMapping[pos]); + } + + @Override + public float getFloat(int pos) { + return row.getFloat(indexMapping[pos]); + } + + @Override + public double getDouble(int pos) { + return row.getDouble(indexMapping[pos]); + } + + @Override + public StringData getString(int pos) { + return row.getString(indexMapping[pos]); + } + + @Override + public DecimalData getDecimal(int pos, int precision, int scale) { + return row.getDecimal(indexMapping[pos], precision, scale); + } + + @Override + public TimestampData getTimestamp(int pos, int precision) { + return row.getTimestamp(indexMapping[pos], precision); + } + + @Override + public <T> RawValueData<T> getRawValue(int pos) { + return row.getRawValue(indexMapping[pos]); + } + + @Override + public byte[] getBinary(int pos) { + return row.getBinary(indexMapping[pos]); + } + + @Override + public ArrayData getArray(int pos) { + return row.getArray(indexMapping[pos]); + } + + @Override + public MapData getMap(int pos) { + return row.getMap(indexMapping[pos]); + } + + @Override + public RowData getRow(int pos, int numFields) { + return row.getRow(indexMapping[pos], numFields); + } + + @Override + public boolean equals(Object o) { + if (this == o) { Review comment: throw `UnsupportedOperationExceptions` for `equals/hashCode`. the current implementation doesn't help when comparing different `RowData` instances. and even comparing only two projected rows does not compare the projected fields individually. ########## File path: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java ########## @@ -43,7 +44,8 @@ import java.util.stream.Stream; /** {@link DecodingFormat} for Canal using JSON encoding. */ -public class CanalJsonDecodingFormat implements DecodingFormat<DeserializationSchema<RowData>> { +public class CanalJsonDecodingFormat Review comment: Shall we keep `DecodingFormat<DeserializationSchema<RowData>>` non-projected for now? I would have a better feeling here, because I'm not sure how good our tests are. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/projection/Projection.java ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.projection; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +import java.util.List; +import java.util.stream.IntStream; + +/** + * {@link Projection} represents a list of (possibly nested) indexes that can be used to project + * data types. + */ +@Internal +public interface Projection { Review comment: The implementation is relatively simply. Let's simplify the architecture and fuse all classes into one file with private static classes. Then we don't need a dedicated package and we have a nice compact utility. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/projection/EmptyProjection.java ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.projection; + +import org.apache.flink.table.types.DataType; + +import java.util.stream.IntStream; + +class EmptyProjection extends AbstractProjection { + + static final EmptyProjection INSTANCE = new EmptyProjection(); + + private EmptyProjection() {} + + @Override + public DataType project(DataType dataType) { + return dataType; Review comment: call `DataType.projectFields` as well? ########## File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java ########## @@ -176,23 +179,47 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { && filters.size() > 0) { ((BulkDecodingFormat<RowData>) bulkReaderFormat).applyFilters(filters); } - BulkFormat<RowData, FileSourceSplit> bulkFormat = + + BulkFormat<RowData, FileSourceSplit> format = Review comment: or even methods because I see duplicate code below ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/projection/AbstractProjection.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.projection; Review comment: maybe `org.apache.flink.table.utils.projection` is a better location? ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/projection/EmptyProjection.java ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.projection; + +import org.apache.flink.table.types.DataType; + +import java.util.stream.IntStream; + +class EmptyProjection extends AbstractProjection { + + static final EmptyProjection INSTANCE = new EmptyProjection(); + + private EmptyProjection() {} + + @Override + public DataType project(DataType dataType) { + return dataType; Review comment: isn't an empty projection an empty row type? ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/projection/Projection.java ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.projection; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +import java.util.List; +import java.util.stream.IntStream; + +/** + * {@link Projection} represents a list of (possibly nested) indexes that can be used to project + * data types. + */ +@Internal +public interface Projection { + + /** Project the provided {@link DataType} using this {@link Projection}. */ + DataType project(DataType dataType); + + /** @return {@code true} whether this projection is nested or not. */ + boolean isNested(); + + /** + * Perform a difference of this {@link Projection} with another {@link Projection}. The result + * of this operation is a new {@link Projection} retaining the same ordering of this instance + * but with the indexes from {@code other} removed. For example: + * + * <pre> + * <code> + * [4, 1, 0, 3, 2] - [4, 2] = [1, 0, 2] + * </code> + * </pre> + * + * <p>Note how the index {@code 3} in the minuend becomes {@code 2} because it's rescaled to + * project correctly a {@link RowData} or arity 3. + * + * @param other the subtrahend + * @throws IllegalArgumentException when {@code other} is nested. + */ + Projection difference(Projection other); + + /** + * Complement this projection. The returned projection is an ordered projection of fields from 0 + * to {@code fieldsNumber} except the indexes in this {@link Projection}. For example: + * + * <pre> + * <code> + * [4, 2].complement(5) = [0, 1, 3] + * </code> + * </pre> + * + * @param fieldsNumber the size of the universe + * @throws IllegalStateException if this projection is nested. + */ + Projection complement(int fieldsNumber); + + /** + * Convert this instance to a projection of top level indexes. + * + * @throws IllegalStateException if this projection is nested. + */ + int[] toTopLevelIndexes(); + + /** Convert this instance to a nested projection. */ Review comment: Explain what that means? Explain index paths. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/projection/EmptyProjection.java ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.projection; + +import org.apache.flink.table.types.DataType; + +import java.util.stream.IntStream; + +class EmptyProjection extends AbstractProjection { Review comment: Please add JavaDocs to all classes. Even an `empty projection` could be misleading whether it is a no-op or an empty row afterwards. ########## File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java ########## @@ -176,23 +179,47 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { && filters.size() > 0) { ((BulkDecodingFormat<RowData>) bulkReaderFormat).applyFilters(filters); } - BulkFormat<RowData, FileSourceSplit> bulkFormat = + + BulkFormat<RowData, FileSourceSplit> format = Review comment: this is pretty nested. maybe a dedicated `if/else` with intermediate local variables? ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/ProjectableDecodingFormat.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.connector.format; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.projection.Projection; + +/** + * For more details on usage and differences between {@link DecodingFormat} and {@link Review comment: add one paragraph to quickly describe the interface in one sentence before referring to the full essay. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/projection/Projection.java ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.projection; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +import java.util.List; +import java.util.stream.IntStream; + +/** + * {@link Projection} represents a list of (possibly nested) indexes that can be used to project + * data types. + */ +@Internal Review comment: mark it `@PublicEvolving`? I think this is very useful to connector implementers and has a good architecture. ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/projection/Projection.java ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.projection; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +import java.util.List; +import java.util.stream.IntStream; + +/** + * {@link Projection} represents a list of (possibly nested) indexes that can be used to project + * data types. + */ +@Internal +public interface Projection { + + /** Project the provided {@link DataType} using this {@link Projection}. */ + DataType project(DataType dataType); + + /** @return {@code true} whether this projection is nested or not. */ + boolean isNested(); + + /** + * Perform a difference of this {@link Projection} with another {@link Projection}. The result + * of this operation is a new {@link Projection} retaining the same ordering of this instance + * but with the indexes from {@code other} removed. For example: + * + * <pre> + * <code> + * [4, 1, 0, 3, 2] - [4, 2] = [1, 0, 2] + * </code> + * </pre> + * + * <p>Note how the index {@code 3} in the minuend becomes {@code 2} because it's rescaled to Review comment: This sounds very confusing. The input row should not have changed. Maybe `difference` needs a different definition or a better name. Is the difference assuming that a projection has been done "before" calling this projection? ########## File path: flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java ########## @@ -120,24 +119,16 @@ public ChangelogMode getChangelogMode() { @Override public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat( DynamicTableFactory.Context context, ReadableConfig formatOptions) { - List<String> schemaFields = - DataType.getFieldNames(context.getPhysicalRowDataType()).stream() - .filter( - field -> - !context.getCatalogTable() - .getPartitionKeys() - .contains(field)) - .collect(Collectors.toList()); - return new DecodingFormat<DeserializationSchema<RowData>>() { + return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() { Review comment: let's not implement this for CSV? ########## File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java ########## @@ -120,38 +122,39 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { return InputFormatProvider.of(new CollectionInputFormat<>(new ArrayList<>(), null)); } - // Physical type is computed from the full data type, filtering out partition and - // metadata columns. This type is going to be used by formats to parse the input. - List<DataTypes.Field> producedDataTypeFields = DataType.getFields(producedDataType); - if (metadataKeys != null && !metadataKeys.isEmpty()) { - // If metadata keys are present, then by SupportsReadingMetadata contract all the - // metadata columns will be at the end of the producedDataType, so we can just remove - // from the list the last metadataKeys.size() fields. - producedDataTypeFields = - producedDataTypeFields.subList( - 0, producedDataTypeFields.size() - metadataKeys.size()); - } - DataType physicalDataType = - producedDataTypeFields.stream() - .filter(f -> partitionKeys == null || !partitionKeys.contains(f.getName())) - .collect(Collectors.collectingAndThen(Collectors.toList(), DataTypes::ROW)); - // Resolve metadata and make sure to filter out metadata not in the producedDataType - List<String> metadataKeys = - (this.metadataKeys == null) ? Collections.emptyList() : this.metadataKeys; - metadataKeys = + final List<String> metadataKeys = DataType.getFieldNames(producedDataType).stream() - .filter(metadataKeys::contains) + .filter( + ((this.metadataKeys == null) + ? Collections.emptyList() + : this.metadataKeys) + ::contains) .collect(Collectors.toList()); - List<ReadableFileInfo> metadataToExtract = + final List<ReadableFileInfo> metadataToExtract = metadataKeys.stream().map(ReadableFileInfo::resolve).collect(Collectors.toList()); // Filter out partition columns not in producedDataType - List<String> partitionKeysToExtract = + final List<String> partitionKeysToExtract = DataType.getFieldNames(producedDataType).stream() .filter(this.partitionKeys::contains) .collect(Collectors.toList()); + // Compute the physical projection and the physical data type, that is + // the type without partition columns and metadata in the same order of the schema + DataType physicalDataType = this.schema.toPhysicalRowDataType(); + final Projection partitionKeysProjections = + Projection.fromFieldNames(physicalDataType, partitionKeysToExtract); + final Projection physicalProjections = + (projectFields != null + ? Projection.of(projectFields) + : Projection.all(physicalDataType)) + .difference(partitionKeysProjections); + physicalDataType = + partitionKeysProjections + .complement(DataType.getFieldCount(physicalDataType)) Review comment: overload `complement` for this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
