JingsongLi commented on a change in pull request #1623: URL: https://github.com/apache/iceberg/pull/1623#discussion_r516439114
########## File path: flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.actions; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BaseRewriteDataFilesAction; +import org.apache.iceberg.flink.source.RowDataRewriter; +import org.apache.iceberg.io.FileIO; + +public class RewriteDataFilesAction extends BaseRewriteDataFilesAction<RewriteDataFilesAction> { + + private ExecutionEnvironment env; + + public RewriteDataFilesAction(ExecutionEnvironment env, Table table) { + super(table); + this.env = env; + } + + @Override + protected FileIO fileIO() { + return table().io(); + } + + @Override + protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) { + DataSet<CombinedScanTask> dataSet = env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size()); Review comment: I think `setParallelism` may not works. What we want is a `SparkContext.parallelize`, but this `setParallelism` just set parallelism, records still in a single node, downstream operators only have computation on one node. Can you check this? ########## File path: flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.actions; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BaseRewriteDataFilesAction; +import org.apache.iceberg.flink.source.RowDataRewriter; +import org.apache.iceberg.io.FileIO; + +public class RewriteDataFilesAction extends BaseRewriteDataFilesAction<RewriteDataFilesAction> { + + private ExecutionEnvironment env; + + public RewriteDataFilesAction(ExecutionEnvironment env, Table table) { + super(table); + this.env = env; + } + + @Override + protected FileIO fileIO() { + return table().io(); + } + + @Override + protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) { + DataSet<CombinedScanTask> dataSet = env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size()); Review comment: You can use `DataStreamUtils.collect` ########## File path: flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.actions; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BaseRewriteDataFilesAction; +import org.apache.iceberg.flink.source.RowDataRewriter; +import org.apache.iceberg.io.FileIO; + +public class RewriteDataFilesAction extends BaseRewriteDataFilesAction<RewriteDataFilesAction> { + + private ExecutionEnvironment env; + + public RewriteDataFilesAction(ExecutionEnvironment env, Table table) { + super(table); + this.env = env; + } + + @Override + protected FileIO fileIO() { + return table().io(); + } + + @Override + protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) { + DataSet<CombinedScanTask> dataSet = env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size()); Review comment: Thanks for the update and check, I think you are right, the graph is good. ########## File path: flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.actions; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BaseRewriteDataFilesAction; +import org.apache.iceberg.flink.source.RowDataRewriter; +import org.apache.iceberg.io.FileIO; + +public class RewriteDataFilesAction extends BaseRewriteDataFilesAction<RewriteDataFilesAction> { + + private ExecutionEnvironment env; + + public RewriteDataFilesAction(ExecutionEnvironment env, Table table) { + super(table); + this.env = env; + } + + @Override + protected FileIO fileIO() { + return table().io(); + } + + @Override + protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) { + DataSet<CombinedScanTask> dataSet = env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size()); Review comment: I think about parallelism again. Can we not set parallelism? Flink is a push based model, which is not suitable for taking on too much parallelism. So can we just let it as default parallelims? ########## File path: flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.actions; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BaseRewriteDataFilesAction; +import org.apache.iceberg.flink.source.RowDataRewriter; +import org.apache.iceberg.io.FileIO; + +public class RewriteDataFilesAction extends BaseRewriteDataFilesAction<RewriteDataFilesAction> { + + private ExecutionEnvironment env; + + public RewriteDataFilesAction(ExecutionEnvironment env, Table table) { + super(table); + this.env = env; + } + + @Override + protected FileIO fileIO() { + return table().io(); + } + + @Override + protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) { + DataSet<CombinedScanTask> dataSet = env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size()); Review comment: Let me explain why we can't set parallelism directly: Flink is in pipeline mode by default, which means that all tasks must be started. If we want to compact a large table, it may require thousands of parallelism, which leads to the need for a very large cluster. This is similar to the Hive source parallelism inference: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_read_write.html#source-parallelism-inference The value should be used to infer parallelism, but we need a max parallelism. So I agree with you very much. Let's add a max parallelism. ########## File path: flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.actions; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BaseRewriteDataFilesAction; +import org.apache.iceberg.flink.source.RowDataRewriter; +import org.apache.iceberg.io.FileIO; + +public class RewriteDataFilesAction extends BaseRewriteDataFilesAction<RewriteDataFilesAction> { + + private ExecutionEnvironment env; + + public RewriteDataFilesAction(ExecutionEnvironment env, Table table) { + super(table); + this.env = env; + } + + @Override + protected FileIO fileIO() { + return table().io(); + } + + @Override + protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) { + DataSet<CombinedScanTask> dataSet = env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size()); Review comment: BTW, you should set parallelism after `map` instead of modifying env. ########## File path: flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.actions; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BaseRewriteDataFilesAction; +import org.apache.iceberg.flink.source.RowDataRewriter; +import org.apache.iceberg.io.FileIO; + +public class RewriteDataFilesAction extends BaseRewriteDataFilesAction<RewriteDataFilesAction> { + + private ExecutionEnvironment env; + + public RewriteDataFilesAction(ExecutionEnvironment env, Table table) { + super(table); + this.env = env; + } + + @Override + protected FileIO fileIO() { + return table().io(); + } + + @Override + protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) { + DataSet<CombinedScanTask> dataSet = env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size()); Review comment: BTW, you should set parallelism for `map` operation instead of modifying env. ########## File path: flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.actions; + +import java.util.List; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.BaseRewriteDataFilesAction; +import org.apache.iceberg.flink.source.RowDataRewriter; +import org.apache.iceberg.io.FileIO; + +public class RewriteDataFilesAction extends BaseRewriteDataFilesAction<RewriteDataFilesAction> { + + private ExecutionEnvironment env; + + public RewriteDataFilesAction(ExecutionEnvironment env, Table table) { + super(table); + this.env = env; + } + + @Override + protected FileIO fileIO() { + return table().io(); + } + + @Override + protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> combinedScanTasks) { + DataSet<CombinedScanTask> dataSet = env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size()); Review comment: > although we set the parallelism for StreamExecutionEnvironment, I found that the DataStreamUtils.collect operator only uses 1 parallelism, so set parallelism to map operator or StreamExecutionEnvironment seems to have the same effect, but set parallelism to map is better to understand. It is better not to bring extra impact to env, because this env may be reused by users. > I submit an issue (#1660) to set max parallelism a few days ago, I will open a new pr for #1660 when this PR is merged Can you modify in this PR? Or just not set parallelism in this PR? I think it is not good to bring user cluster risk to master code. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
