JingsongLi commented on a change in pull request #1623:
URL: https://github.com/apache/iceberg/pull/1623#discussion_r516439114



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.actions;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseRewriteDataFilesAction;
+import org.apache.iceberg.flink.source.RowDataRewriter;
+import org.apache.iceberg.io.FileIO;
+
+public class RewriteDataFilesAction extends 
BaseRewriteDataFilesAction<RewriteDataFilesAction> {
+
+  private ExecutionEnvironment env;
+
+  public RewriteDataFilesAction(ExecutionEnvironment env, Table table) {
+    super(table);
+    this.env = env;
+  }
+
+  @Override
+  protected FileIO fileIO() {
+    return table().io();
+  }
+
+  @Override
+  protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> 
combinedScanTasks) {
+    DataSet<CombinedScanTask> dataSet = 
env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size());

Review comment:
       I think `setParallelism` may not works.
   What we want is a `SparkContext.parallelize`, but this `setParallelism` just 
set parallelism, records still in a single node, downstream operators only have 
computation on one node.
   Can you check this?

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.actions;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseRewriteDataFilesAction;
+import org.apache.iceberg.flink.source.RowDataRewriter;
+import org.apache.iceberg.io.FileIO;
+
+public class RewriteDataFilesAction extends 
BaseRewriteDataFilesAction<RewriteDataFilesAction> {
+
+  private ExecutionEnvironment env;
+
+  public RewriteDataFilesAction(ExecutionEnvironment env, Table table) {
+    super(table);
+    this.env = env;
+  }
+
+  @Override
+  protected FileIO fileIO() {
+    return table().io();
+  }
+
+  @Override
+  protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> 
combinedScanTasks) {
+    DataSet<CombinedScanTask> dataSet = 
env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size());

Review comment:
       You can use `DataStreamUtils.collect`

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.actions;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseRewriteDataFilesAction;
+import org.apache.iceberg.flink.source.RowDataRewriter;
+import org.apache.iceberg.io.FileIO;
+
+public class RewriteDataFilesAction extends 
BaseRewriteDataFilesAction<RewriteDataFilesAction> {
+
+  private ExecutionEnvironment env;
+
+  public RewriteDataFilesAction(ExecutionEnvironment env, Table table) {
+    super(table);
+    this.env = env;
+  }
+
+  @Override
+  protected FileIO fileIO() {
+    return table().io();
+  }
+
+  @Override
+  protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> 
combinedScanTasks) {
+    DataSet<CombinedScanTask> dataSet = 
env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size());

Review comment:
       Thanks for the update and check, I think you are right, the graph is 
good.

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.actions;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseRewriteDataFilesAction;
+import org.apache.iceberg.flink.source.RowDataRewriter;
+import org.apache.iceberg.io.FileIO;
+
+public class RewriteDataFilesAction extends 
BaseRewriteDataFilesAction<RewriteDataFilesAction> {
+
+  private ExecutionEnvironment env;
+
+  public RewriteDataFilesAction(ExecutionEnvironment env, Table table) {
+    super(table);
+    this.env = env;
+  }
+
+  @Override
+  protected FileIO fileIO() {
+    return table().io();
+  }
+
+  @Override
+  protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> 
combinedScanTasks) {
+    DataSet<CombinedScanTask> dataSet = 
env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size());

Review comment:
       I think about parallelism again.
   Can we not set parallelism? Flink is a push based model, which is not 
suitable for taking on too much parallelism.
   So can we just let it as default parallelims?

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.actions;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseRewriteDataFilesAction;
+import org.apache.iceberg.flink.source.RowDataRewriter;
+import org.apache.iceberg.io.FileIO;
+
+public class RewriteDataFilesAction extends 
BaseRewriteDataFilesAction<RewriteDataFilesAction> {
+
+  private ExecutionEnvironment env;
+
+  public RewriteDataFilesAction(ExecutionEnvironment env, Table table) {
+    super(table);
+    this.env = env;
+  }
+
+  @Override
+  protected FileIO fileIO() {
+    return table().io();
+  }
+
+  @Override
+  protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> 
combinedScanTasks) {
+    DataSet<CombinedScanTask> dataSet = 
env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size());

Review comment:
       Let me explain why we can't set parallelism directly:
   Flink is in pipeline mode by default, which means that all tasks must be 
started.
   If we want to compact a large table, it may require thousands of 
parallelism, which leads to the need for a very large cluster.
   
   This is similar to the Hive source parallelism inference: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/hive/hive_read_write.html#source-parallelism-inference
   The value should be used to infer parallelism, but we need a max parallelism.
   
   So I agree with you very much. Let's add a max parallelism.

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.actions;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseRewriteDataFilesAction;
+import org.apache.iceberg.flink.source.RowDataRewriter;
+import org.apache.iceberg.io.FileIO;
+
+public class RewriteDataFilesAction extends 
BaseRewriteDataFilesAction<RewriteDataFilesAction> {
+
+  private ExecutionEnvironment env;
+
+  public RewriteDataFilesAction(ExecutionEnvironment env, Table table) {
+    super(table);
+    this.env = env;
+  }
+
+  @Override
+  protected FileIO fileIO() {
+    return table().io();
+  }
+
+  @Override
+  protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> 
combinedScanTasks) {
+    DataSet<CombinedScanTask> dataSet = 
env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size());

Review comment:
       BTW, you should set parallelism after `map` instead of modifying env.

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.actions;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseRewriteDataFilesAction;
+import org.apache.iceberg.flink.source.RowDataRewriter;
+import org.apache.iceberg.io.FileIO;
+
+public class RewriteDataFilesAction extends 
BaseRewriteDataFilesAction<RewriteDataFilesAction> {
+
+  private ExecutionEnvironment env;
+
+  public RewriteDataFilesAction(ExecutionEnvironment env, Table table) {
+    super(table);
+    this.env = env;
+  }
+
+  @Override
+  protected FileIO fileIO() {
+    return table().io();
+  }
+
+  @Override
+  protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> 
combinedScanTasks) {
+    DataSet<CombinedScanTask> dataSet = 
env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size());

Review comment:
       BTW, you should set parallelism for `map` operation instead of modifying 
env.

##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.actions;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.BaseRewriteDataFilesAction;
+import org.apache.iceberg.flink.source.RowDataRewriter;
+import org.apache.iceberg.io.FileIO;
+
+public class RewriteDataFilesAction extends 
BaseRewriteDataFilesAction<RewriteDataFilesAction> {
+
+  private ExecutionEnvironment env;
+
+  public RewriteDataFilesAction(ExecutionEnvironment env, Table table) {
+    super(table);
+    this.env = env;
+  }
+
+  @Override
+  protected FileIO fileIO() {
+    return table().io();
+  }
+
+  @Override
+  protected List<DataFile> rewriteDataForTasks(List<CombinedScanTask> 
combinedScanTasks) {
+    DataSet<CombinedScanTask> dataSet = 
env.fromCollection(combinedScanTasks).setParallelism(combinedScanTasks.size());

Review comment:
       > although we set the parallelism for StreamExecutionEnvironment, I 
found that the DataStreamUtils.collect operator only uses 1 parallelism, so set 
parallelism to map operator or StreamExecutionEnvironment seems to have the 
same effect, but set parallelism to map is better to understand.
   
   It is better not to bring extra impact to env, because this env may be 
reused by users.
   
   > I submit an issue (#1660) to set max parallelism a few days ago, I will 
open a new pr for #1660 when this PR is merged
   
   Can you modify in this PR? Or just not set parallelism in this PR?
   I think it is not good to bring user cluster risk to master code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to