zhangjun0x01 commented on a change in pull request #1623: URL: https://github.com/apache/iceberg/pull/1623#discussion_r516381086
########## File path: flink/src/test/java/org/apache/iceberg/flink/actions/TestRewriteDataFilesAction.java ########## @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.flink.actions; + +import java.io.IOException; +import java.util.List; +import org.apache.commons.lang3.StringUtils; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.RewriteDataFilesActionResult; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.flink.FlinkCatalogTestBase; +import org.apache.iceberg.flink.SimpleDataUtil; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.flink.SimpleDataUtil.RECORD; + +@RunWith(Parameterized.class) +public class TestRewriteDataFilesAction extends FlinkCatalogTestBase { + + private static final String TABLE_NAME_UNPARTIITONED = "test_table_unpartitioned"; Review comment: I updated it ########## File path: flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.actions; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BaseRewriteDataFilesAction; +import org.apache.iceberg.flink.source.RowDataRewriter; +import org.apache.iceberg.io.FileIO; + +public class RewriteDataFilesAction extends BaseRewriteDataFilesAction<RewriteDataFilesAction> { + + private ExecutionEnvironment env; + + public RewriteDataFilesAction(ExecutionEnvironment env, Table table) { + super(table); + this.env = env; + } + + @Override + protected FileIO fileIO() { + return table().io(); + } + + @Override + protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) { + DataSet<CombinedScanTask> dataSet = env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size()); Review comment: > Thanks @zhangjun0x01 for your contribution, why do you want to use `DataSet` API? `DataSet` API is being deprecated. > Looks like just need a `parallelize` and `map` and `collect`. I think `DataStream`(We can call it BoundedStream) can finish this. > What do you think? Hi,@JingsongLi: I look up the api of the DataStream, it seems no `collect` method ########## File path: flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.actions; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BaseRewriteDataFilesAction; +import org.apache.iceberg.flink.source.RowDataRewriter; +import org.apache.iceberg.io.FileIO; + +public class RewriteDataFilesAction extends BaseRewriteDataFilesAction<RewriteDataFilesAction> { + + private ExecutionEnvironment env; + + public RewriteDataFilesAction(ExecutionEnvironment env, Table table) { + super(table); + this.env = env; + } + + @Override + protected FileIO fileIO() { + return table().io(); + } + + @Override + protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) { + DataSet<CombinedScanTask> dataSet = env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size()); Review comment: I updated the pr, use DataStream instead of DataSet , and set the parallelism to StreamExecutionEnvironment. but setParallelism for `env.fromCollection(combinedScanTasks)` does not seem to have an impact on downstream operator,this is my test job graph for use dataset. https://blog.csdn.net/zhangjun5965/article/details/109469798 ########## File path: flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.actions; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BaseRewriteDataFilesAction; +import org.apache.iceberg.flink.source.RowDataRewriter; +import org.apache.iceberg.io.FileIO; + +public class RewriteDataFilesAction extends BaseRewriteDataFilesAction<RewriteDataFilesAction> { + + private ExecutionEnvironment env; + + public RewriteDataFilesAction(ExecutionEnvironment env, Table table) { + super(table); + this.env = env; + } + + @Override + protected FileIO fileIO() { + return table().io(); + } + + @Override + protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) { + DataSet<CombinedScanTask> dataSet = env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size()); Review comment: If we do not set the parallelism, we can use the default parallelism of flink. If the user wants to modify the parallelism to improve the execution speed, it can be set by the `-p` parameter. However, the user may not know how much parallelism to set,because users may not know how many data files to Rewrite each time. If the setting is too large, it may cause a waste of resources. So I think we can set a max parallelism. If the size of CombinedScanTask list is less than max parallelism, we use the CombinedScanTask list size as the job parallelism, if the size is greater than max parallelism,we use the max parallelism which we set. What do you think? ########## File path: flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.actions; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BaseRewriteDataFilesAction; +import org.apache.iceberg.flink.source.RowDataRewriter; +import org.apache.iceberg.io.FileIO; + +public class RewriteDataFilesAction extends BaseRewriteDataFilesAction<RewriteDataFilesAction> { + + private ExecutionEnvironment env; + + public RewriteDataFilesAction(ExecutionEnvironment env, Table table) { + super(table); + this.env = env; + } + + @Override + protected FileIO fileIO() { + return table().io(); + } + + @Override + protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) { + DataSet<CombinedScanTask> dataSet = env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size()); Review comment: I updated the pr, although we set the parallelism for StreamExecutionEnvironment, I found that the DataStreamUtils.collect operator only uses 1 parallelism, so set parallelism to map operator or StreamExecutionEnvironment seems to have the same effect, but set parallelism to map is better to understand . I submit an issue (#1660) to set max parallelism a few days ago, I will open a new pr for #1660 when this PR is merged ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
