[
https://issues.apache.org/jira/browse/BEAM-981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16300313#comment-16300313
]
ASF GitHub Bot commented on BEAM-981:
-------------------------------------
asfgit closed pull request #4246: [BEAM-981] Add parameter allowing adding jars
to spark context
URL: https://github.com/apache/beam/pull/4246
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
new file mode 100644
index 00000000000..ae6b076acf2
--- /dev/null
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineResources.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import java.io.File;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Utilities for working with classpath resources for pipelines. */
+public class PipelineResources {
+
+ /**
+ * Attempts to detect all the resources the class loader has access to. This
does not recurse
+ * to class loader parents stopping it from pulling in resources from the
system class loader.
+ *
+ * @param classLoader The URLClassLoader to use to detect resources to stage.
+ * @throws IllegalArgumentException If either the class loader is not a
URLClassLoader or one
+ * of the resources the class loader exposes is not a file resource.
+ * @return A list of absolute paths to the resources the class loader uses.
+ */
+ public static List<String> detectClassPathResourcesToStage(ClassLoader
classLoader) {
+ if (!(classLoader instanceof URLClassLoader)) {
+ String message = String.format("Unable to use ClassLoader to detect
classpath elements. "
+ + "Current ClassLoader is %s, only URLClassLoaders are supported.",
classLoader);
+ throw new IllegalArgumentException(message);
+ }
+
+ List<String> files = new ArrayList<>();
+ for (URL url : ((URLClassLoader) classLoader).getURLs()) {
+ try {
+ files.add(new File(url.toURI()).getAbsolutePath());
+ } catch (IllegalArgumentException | URISyntaxException e) {
+ String message = String.format("Unable to convert url (%s) to file.",
url);
+ throw new IllegalArgumentException(message, e);
+ }
+ }
+ return files;
+ }
+}
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java
new file mode 100644
index 00000000000..633df01246f
--- /dev/null
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineResourcesTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+
+/**
+ * Tests for PipelineResources.
+ */
+@RunWith(JUnit4.class)
+public class PipelineResourcesTest {
+
+ @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void detectClassPathResourceWithFileResources() throws Exception {
+ File file = tmpFolder.newFile("file");
+ File file2 = tmpFolder.newFile("file2");
+ URLClassLoader classLoader = new URLClassLoader(new URL[] {
+ file.toURI().toURL(),
+ file2.toURI().toURL()
+ });
+
+ assertEquals(ImmutableList.of(file.getAbsolutePath(),
file2.getAbsolutePath()),
+ PipelineResources.detectClassPathResourcesToStage(classLoader));
+ }
+
+ @Test
+ public void detectClassPathResourcesWithUnsupportedClassLoader() {
+ ClassLoader mockClassLoader = Mockito.mock(ClassLoader.class);
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Unable to use ClassLoader to detect classpath
elements.");
+
+ PipelineResources.detectClassPathResourcesToStage(mockClassLoader);
+ }
+
+ @Test
+ public void detectClassPathResourceWithNonFileResources() throws Exception {
+ String url = "http://www.google.com/all-the-secrets.jar";
+ URLClassLoader classLoader = new URLClassLoader(new URL[] {
+ new URL(url)
+ });
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Unable to convert url (" + url + ") to file.");
+
+ PipelineResources.detectClassPathResourcesToStage(classLoader);
+ }
+}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 2432394f464..01f78473ed0 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -43,7 +43,6 @@
*/
@Description("Jar-Files to send to all workers and put on the classpath. "
+ "The default value is all files from the classpath.")
- @JsonIgnore
List<String> getFilesToStage();
void setFilesToStage(List<String> value);
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index ca12615be03..5fdcdcec121 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -17,14 +17,11 @@
*/
package org.apache.beam.runners.flink;
+import static
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
+
import com.google.common.base.Joiner;
-import java.io.File;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
@@ -150,36 +147,7 @@ public String toString() {
return "FlinkRunner#" + hashCode();
}
- /**
- * Attempts to detect all the resources the class loader has access to. This
does not recurse
- * to class loader parents stopping it from pulling in resources from the
system class loader.
- *
- * @param classLoader The URLClassLoader to use to detect resources to stage.
- * @return A list of absolute paths to the resources the class loader uses.
- * @throws IllegalArgumentException If either the class loader is not a
URLClassLoader or one
- * of the resources the class loader exposes is not a file resource.
- */
- protected static List<String> detectClassPathResourcesToStage(
- ClassLoader classLoader) {
- if (!(classLoader instanceof URLClassLoader)) {
- String message = String.format("Unable to use ClassLoader to detect
classpath elements. "
- + "Current ClassLoader is %s, only URLClassLoaders are supported.",
classLoader);
- LOG.error(message);
- throw new IllegalArgumentException(message);
- }
- List<String> files = new ArrayList<>();
- for (URL url : ((URLClassLoader) classLoader).getURLs()) {
- try {
- files.add(new File(url.toURI()).getAbsolutePath());
- } catch (IllegalArgumentException | URISyntaxException e) {
- String message = String.format("Unable to convert url (%s) to file.",
url);
- LOG.error(message);
- throw new IllegalArgumentException(message, e);
- }
- }
- return files;
- }
/** A set of {@link View}s with non-deterministic key coders. */
Set<PTransform<?, ?>> ptransformViewsWithNonDeterministicKeyCoders;
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index a6500924149..025134477ed 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
+import static
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
@@ -40,12 +41,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
-import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.net.URLClassLoader;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Arrays;
@@ -253,9 +250,9 @@ public static DataflowRunner fromOptions(PipelineOptions
options) {
throw new IllegalArgumentException("No files to stage has been
found.");
} else {
LOG.info("PipelineOptions.filesToStage was not specified. "
- + "Defaulting to files from the classpath: will stage
{} files. "
- + "Enable logging at DEBUG level to see which files
will be staged.",
- dataflowOptions.getFilesToStage().size());
+ + "Defaulting to files from the classpath: will stage {}
files. "
+ + "Enable logging at DEBUG level to see which files will be
staged.",
+ dataflowOptions.getFilesToStage().size());
LOG.debug("Classpath elements: {}", dataflowOptions.getFilesToStage());
}
}
@@ -1478,36 +1475,6 @@ public String toString() {
return "DataflowRunner#" + options.getJobName();
}
- /**
- * Attempts to detect all the resources the class loader has access to. This
does not recurse
- * to class loader parents stopping it from pulling in resources from the
system class loader.
- *
- * @param classLoader The URLClassLoader to use to detect resources to stage.
- * @throws IllegalArgumentException If either the class loader is not a
URLClassLoader or one
- * of the resources the class loader exposes is not a file resource.
- * @return A list of absolute paths to the resources the class loader uses.
- */
- protected static List<String> detectClassPathResourcesToStage(ClassLoader
classLoader) {
- if (!(classLoader instanceof URLClassLoader)) {
- String message = String.format("Unable to use ClassLoader to detect
classpath elements. "
- + "Current ClassLoader is %s, only URLClassLoaders are supported.",
classLoader);
- LOG.error(message);
- throw new IllegalArgumentException(message);
- }
-
- List<String> files = new ArrayList<>();
- for (URL url : ((URLClassLoader) classLoader).getURLs()) {
- try {
- files.add(new File(url.toURI()).getAbsolutePath());
- } catch (IllegalArgumentException | URISyntaxException e) {
- String message = String.format("Unable to convert url (%s) to file.",
url);
- LOG.error(message);
- throw new IllegalArgumentException(message, e);
- }
- }
- return files;
- }
-
/**
* Finds the id for the running job of the given name.
*/
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
index 2239462ac2e..b02869b45de 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
@@ -199,7 +199,6 @@ public String create(PipelineOptions options) {
@Description("Files to stage on GCS and make available to workers. "
+ "Files are placed on the worker's classpath. "
+ "The default value is all files from the classpath.")
- @JsonIgnore
List<String> getFilesToStage();
void setFilesToStage(List<String> value);
diff --git
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index edf513b7c94..90748aff329 100644
---
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -53,8 +53,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
-import java.net.URL;
-import java.net.URLClassLoader;
import java.nio.channels.FileChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
@@ -627,40 +625,6 @@ public void runWithDefaultFilesToStage() throws Exception {
assertTrue(!options.getFilesToStage().isEmpty());
}
- @Test
- public void detectClassPathResourceWithFileResources() throws Exception {
- File file = tmpFolder.newFile("file");
- File file2 = tmpFolder.newFile("file2");
- URLClassLoader classLoader = new URLClassLoader(new URL[] {
- file.toURI().toURL(),
- file2.toURI().toURL()
- });
-
- assertEquals(ImmutableList.of(file.getAbsolutePath(),
file2.getAbsolutePath()),
- DataflowRunner.detectClassPathResourcesToStage(classLoader));
- }
-
- @Test
- public void detectClassPathResourcesWithUnsupportedClassLoader() {
- ClassLoader mockClassLoader = Mockito.mock(ClassLoader.class);
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Unable to use ClassLoader to detect classpath
elements.");
-
- DataflowRunner.detectClassPathResourcesToStage(mockClassLoader);
- }
-
- @Test
- public void detectClassPathResourceWithNonFileResources() throws Exception {
- String url = "http://www.google.com/all-the-secrets.jar";
- URLClassLoader classLoader = new URLClassLoader(new URL[] {
- new URL(url)
- });
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Unable to convert url (" + url + ") to file.");
-
- DataflowRunner.detectClassPathResourcesToStage(classLoader);
- }
-
@Test
public void testGcsStagingLocationInitialization() throws Exception {
// Set temp location (required), and check that staging location is set.
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
index 98f74923931..0a7995f744f 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkContextOptions.java
@@ -54,7 +54,7 @@
List<JavaStreamingListener> getListeners();
void setListeners(List<JavaStreamingListener> listeners);
- /** Returns an empty list, top avoid handling null. */
+ /** Returns an empty list, to avoid handling null. */
class EmptyListenersList implements
DefaultValueFactory<List<JavaStreamingListener>> {
@Override
public List<JavaStreamingListener> create(PipelineOptions options) {
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 26b549baa2e..2db82090cad 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.spark;
+import java.util.List;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -25,8 +26,6 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
-
-
/**
* Spark runner {@link PipelineOptions} handles Spark execution-related
configurations,
* such as the master address, batch-interval, and other user-related knobs.
@@ -101,4 +100,15 @@ public String create(PipelineOptions options) {
boolean getUsesProvidedSparkContext();
void setUsesProvidedSparkContext(boolean value);
+ /**
+ * List of local files to make available to workers.
+ *
+ * <p>Jars are placed on the worker's classpath.
+ *
+ * <p>The default value is the list of jars from the main program's
classpath.
+ */
+ @Description("Jar-Files to send to all workers and put on the classpath. "
+ + "The default value is all files from the classpath.")
+ List<String> getFilesToStage();
+ void setFilesToStage(List<String> value);
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 4a409cb9005..3495382e921 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -18,6 +18,8 @@
package org.apache.beam.runners.spark;
+import static
org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
+
import com.google.common.collect.Iterables;
import java.util.Arrays;
import java.util.Collection;
@@ -121,6 +123,17 @@ public static SparkRunner create(SparkPipelineOptions
options) {
public static SparkRunner fromOptions(PipelineOptions options) {
SparkPipelineOptions sparkOptions =
PipelineOptionsValidator.validate(SparkPipelineOptions.class, options);
+
+ if (sparkOptions.getFilesToStage() == null) {
+ sparkOptions.setFilesToStage(detectClassPathResourcesToStage(
+ SparkRunner.class.getClassLoader()));
+ LOG.info("PipelineOptions.filesToStage was not specified. "
+ + "Defaulting to files from the classpath: will stage {} files. "
+ + "Enable logging at DEBUG level to see which files will be
staged.",
+ sparkOptions.getFilesToStage().size());
+ LOG.debug("Classpath elements: {}", sparkOptions.getFilesToStage());
+ }
+
return new SparkRunner(sparkOptions);
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index 0132de3dc12..5a8ad2d4d2d 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -92,6 +92,11 @@ private static JavaSparkContext
createSparkContext(SparkContextOptions contextOp
// set master if not set.
conf.setMaster(contextOptions.getSparkMaster());
}
+
+ if (contextOptions.getFilesToStage() != null &&
!contextOptions.getFilesToStage().isEmpty()) {
+ conf.setJars(contextOptions.getFilesToStage().toArray(new String[0]));
+ }
+
conf.setAppName(contextOptions.getAppName());
// register immutable collections serializers because the SDK uses them.
conf.set("spark.kryo.registrator",
BeamSparkRunnerRegistrator.class.getName());
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Not possible to directly submit a pipeline on spark cluster
> -----------------------------------------------------------
>
> Key: BEAM-981
> URL: https://issues.apache.org/jira/browse/BEAM-981
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Affects Versions: 0.6.0
> Reporter: Jean-Baptiste Onofré
> Assignee: Kobi Salant
>
> It's not possible to directly run a pipeline on the spark runner (for
> instance using {{mvn exec:java}}. It fails with:
> {code}
> [appclient-register-master-threadpool-0] INFO
> org.apache.spark.deploy.client.AppClient$ClientEndpoint - Connecting to
> master spark://10.200.118.197:7077...
> [shuffle-client-0] ERROR org.apache.spark.network.client.TransportClient -
> Failed to send RPC 6813731522650020739 to /10.200.118.197:7077:
> java.lang.AbstractMethodError:
> org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
> java.lang.AbstractMethodError:
> org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
> at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:73)
> at
> io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:107)
> at
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:820)
> at
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:733)
> at
> io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:111)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:748)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:740)
> at
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:826)
> at
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:733)
> at
> io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:284)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:748)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:740)
> at
> io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
> at
> io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1101)
> at
> io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1148)
> at
> io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1090)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor.safeExecute(SingleThreadEventExecutor.java:451)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:401)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877)
> at java.lang.Thread.run(Thread.java:745)
> [appclient-register-master-threadpool-0] WARN
> org.apache.spark.deploy.client.AppClient$ClientEndpoint - Failed to connect
> to master 10.200.118.197:7077
> java.io.IOException: Failed to send RPC 6813731522650020739 to
> /10.200.118.197:7077: java.lang.AbstractMethodError:
> org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
> at
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
> at
> org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
> at
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:514)
> at
> io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:507)
> at
> io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:486)
> at
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:427)
> at
> io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:129)
> at
> io.netty.channel.AbstractChannelHandlerContext.notifyOutboundHandlerException(AbstractChannelHandlerContext.java:845)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:750)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:740)
> at
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:826)
> at
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:733)
> at
> io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:284)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:748)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:740)
> at
> io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
> at
> io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1101)
> at
> io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1148)
> at
> io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1090)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor.safeExecute(SingleThreadEventExecutor.java:451)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:418)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:401)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.AbstractMethodError:
> org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
> at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:73)
> at
> io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:107)
> at
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:820)
> at
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:733)
> at
> io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:111)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:748)
> ... 15 more
> {code}
> It looks like a conflict between the Netty version used in Spark and the one
> in Beam (just guessing).
> The workaround is to use {{spark-submit}}.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)