Author: zly
Date: Mon Mar 20 21:59:36 2017
New Revision: 1787852
URL: http://svn.apache.org/viewvc?rev=1787852&view=rev
Log:
PIG-5187:UdfDistributedCache_1 is failing with spark exec type (Nandor via
Liyun)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1787852&r1=1787851&r2=1787852&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Mon Mar 20 21:59:36 2017
@@ -232,7 +232,7 @@ public class SparkLauncher extends Launc
}
private void uploadResources(SparkOperPlan sparkPlan) throws IOException {
- addFilesToSparkJob();
+ addFilesToSparkJob(sparkPlan);
addJarsToSparkJob(sparkPlan);
}
@@ -350,7 +350,7 @@ public class SparkLauncher extends Launc
}
}
- private void addFilesToSparkJob() throws IOException {
+ private void addFilesToSparkJob(SparkOperPlan sparkPlan) throws
IOException {
LOG.info("add files Spark Job");
String shipFiles = pigContext.getProperties().getProperty(
"pig.streaming.ship.files");
@@ -358,12 +358,22 @@ public class SparkLauncher extends Launc
String cacheFiles = pigContext.getProperties().getProperty(
"pig.streaming.cache.files");
cacheFiles(cacheFiles);
+ addUdfResourcesToSparkJob(sparkPlan);
}
+ private void addUdfResourcesToSparkJob(SparkOperPlan sparkPlan) throws
IOException {
+ SparkPOUserFuncVisitor sparkPOUserFuncVisitor = new
SparkPOUserFuncVisitor(sparkPlan);
+ sparkPOUserFuncVisitor.visit();
+ Joiner joiner = Joiner.on(",");
+ String shipFiles = joiner.join(sparkPOUserFuncVisitor.getShipFiles());
+ shipFiles(shipFiles);
+ String cacheFiles =
joiner.join(sparkPOUserFuncVisitor.getCacheFiles());
+ cacheFiles(cacheFiles);
+ }
private void shipFiles(String shipFiles)
throws IOException {
- if (shipFiles != null) {
+ if (shipFiles != null && !shipFiles.isEmpty()) {
for (String file : shipFiles.split(",")) {
File shipFile = new File(file.trim());
if (shipFile.exists()) {
@@ -376,7 +386,7 @@ public class SparkLauncher extends Launc
}
private void cacheFiles(String cacheFiles) throws IOException {
- if (cacheFiles != null) {
+ if (cacheFiles != null && !cacheFiles.isEmpty()) {
File tmpFolder = Files.createTempDirectory("cache").toFile();
tmpFolder.deleteOnExit();
for (String file : cacheFiles.split(",")) {
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java?rev=1787852&view=auto
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java
(added)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPOUserFuncVisitor.java
Mon Mar 20 21:59:36 2017
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.UdfCacheShipFilesVisitor;
+import
org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class SparkPOUserFuncVisitor extends SparkOpPlanVisitor {
+ private Set<String> cacheFiles = new HashSet<>();
+ private Set<String> shipFiles = new HashSet<>();
+
+ public SparkPOUserFuncVisitor(SparkOperPlan plan) {
+ super(plan, new DepthFirstWalker<>(plan));
+ }
+
+ @Override
+ public void visitSparkOp(SparkOperator sparkOperator) throws
VisitorException {
+ if(!sparkOperator.physicalPlan.isEmpty()) {
+ UdfCacheShipFilesVisitor udfCacheFileVisitor = new
UdfCacheShipFilesVisitor(sparkOperator.physicalPlan);
+ udfCacheFileVisitor.visit();
+ cacheFiles.addAll(udfCacheFileVisitor.getCacheFiles());
+ shipFiles.addAll(udfCacheFileVisitor.getShipFiles());
+ }
+ }
+
+ public Set<String> getCacheFiles() {
+ return cacheFiles;
+ }
+
+ public Set<String> getShipFiles() {
+ return shipFiles;
+ }
+}