hililiwei commented on code in PR #3991:
URL: https://github.com/apache/iceberg/pull/3991#discussion_r853696393
##########
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java:
##########
@@ -60,11 +76,37 @@ public static RowDataProjection create(Schema schema,
Schema projectedSchema) {
* @return a wrapper to project rows
*/
public static RowDataProjection create(RowType rowType, Types.StructType
schema, Types.StructType projectedSchema) {
- return new RowDataProjection(rowType, schema, projectedSchema);
+ return RowDataProjection.create(rowType, schema, projectedSchema, null);
+ }
+
+ /**
+ * Creates a projecting wrapper for {@link RowData} rows.
+ * <p>
+ * This projection will not project the nested children types of repeated
types like lists and maps.
+ *
+ * @param rowType flink row type of rows wrapped by this projection
+ * @param schema schema of rows wrapped by this projection
+ * @param projectedSchema result schema of the projected rows
+ * @param projectedFields mapping of nested fields
+ * @return a wrapper to project rows
+ */
+ public static RowDataProjection create(RowType rowType,
+ Types.StructType schema,
+ Types.StructType projectedSchema,
+ int[][] projectedFields) {
Review Comment:
This question has a similar focus to the previous one.
now let me try to explain more clearly why the `projectedFields` field is
required.
When using nested projections to drill data from origin row data, flink
output schema does not equivalent to Iceberg's projected schema. As in the case
of comment https://github.com/apache/iceberg/pull/3991#discussion_r849480029
above.
When `st.f0, id and St.f1` are used to pull data, the `project schema` that
Iceberg uses to pull data from the original file is(as `schema1`):
```
id: bigint
st: struct {
f0: bigint,
f1: bigint
}
```
while the `SOURCE` output schema(`TypeInformation<RowData>`) of Flink is (
as `schema2`):
```
st.f0: bigint
st.f1: bigint
id: bigint
```
Need a mapping to convert the data from `schema1` to `schema2` when Flink
output emit the data. This mapping is called `projectedFields` (value `{ {1,0},
{1,1}, {0} }`).
For example: for the `st.f0: bigint` field, when iceberg pulls data with the
`schema1`, the field's path map in `projectedFields` is {1,0}
we need to first take out the struct data with position 1(st:stuct), and
assign it to value, then take out the bigint data with position 1(f0:bigint) in
loop, assign it to `value` also, that's what we're going to end up with.
This is why value is replaced in the loop.
If the preceding mapping is not used, the values, types, and sequence of
fields output by flink are incorrect.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]