nssalian commented on code in PR #15596:
URL: https://github.com/apache/iceberg/pull/15596#discussion_r3213897894
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java:
##########
@@ -262,4 +262,22 @@ public Duration tableRefreshInterval() {
.flinkConfig(FlinkWriteOptions.TABLE_REFRESH_INTERVAL)
.parseOptional();
}
+
+ public boolean parquetShredVariants() {
+ return confParser
+ .booleanConf()
+ .option(FlinkWriteOptions.PARQUET_SHRED_VARIANTS.key())
+ .tableProperty(TableProperties.PARQUET_SHRED_VARIANTS)
+ .defaultValue(TableProperties.PARQUET_SHRED_VARIANTS_DEFAULT)
+ .parse();
+ }
+
+ public int parquetVariantInferenceBufferSize() {
+ return confParser
+ .intConf()
+ .option(FlinkWriteOptions.PARQUET_VARIANT_INFERENCE_BUFFER_SIZE.key())
Review Comment:
Could you add these to the flink documentation similar to the #14297 for
spark?
##########
parquet/src/test/java/org/apache/iceberg/parquet/TestParquetDataWriter.java:
##########
@@ -370,7 +370,7 @@ protected int resolveColumnIndex(Void engineSchema, String
columnName) {
(icebergSchema, fileSchema, engineSchema, idToConstant) ->
GenericParquetReaders.buildReader(icebergSchema, fileSchema),
testAnalyzer,
- record -> record);
+ unused -> oriRecord -> oriRecord);
Review Comment:
What was this change for?
##########
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java:
##########
@@ -51,7 +51,7 @@ public class ParquetFormatModel<D, S, R>
extends BaseFormatModel<D, S, ParquetValueWriter<?>, R, MessageType> {
private final boolean isBatchReader;
private final VariantShreddingAnalyzer<D, S> variantAnalyzer;
- private final UnaryOperator<D> copyFunc;
+ private final Function<S, UnaryOperator<D>> copyFuncFactory;
Review Comment:
Based on this comment thread we decided to keep the UnaryOperator:
https://github.com/apache/iceberg/pull/14297#discussion_r3015349863. @pvary
suggest BiFunction too? Worth exploring to see what is the best way or we can
keep it to Unary too.
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java:
##########
@@ -262,4 +262,22 @@ public Duration tableRefreshInterval() {
.flinkConfig(FlinkWriteOptions.TABLE_REFRESH_INTERVAL)
.parseOptional();
}
+
+ public boolean shredVariants() {
+ return confParser
+ .booleanConf()
+ .option(FlinkWriteOptions.SHRED_VARIANTS.key())
+ .tableProperty(TableProperties.PARQUET_SHRED_VARIANTS)
+ .defaultValue(TableProperties.PARQUET_SHRED_VARIANTS_DEFAULT)
Review Comment:
Let's do parquet for now since we followed that pattern for the Spark
implementation.
##########
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java:
##########
@@ -105,4 +105,10 @@ private FlinkWriteOptions() {}
// specify the uidSuffix to be used for the underlying IcebergSink
public static final ConfigOption<String> UID_SUFFIX =
ConfigOptions.key("uid-suffix").stringType().defaultValue("");
+
+ public static final ConfigOption<Boolean> PARQUET_SHRED_VARIANTS =
+
ConfigOptions.key("parquet-shred-variants").booleanType().defaultValue(false);
+
+ public static final ConfigOption<Integer>
PARQUET_VARIANT_INFERENCE_BUFFER_SIZE =
Review Comment:
Could you rename this to VARIANT_INFERENCE_BUFFER_SIZE to be consistent with
Spark>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]