This is an automated email from the ASF dual-hosted git repository. spmallette pushed a commit to branch TINKERPOP-2076 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
commit ec660c784340e2fd01c0fc350aed502d4927e52d Author: stephen <spmalle...@gmail.com> AuthorDate: Tue Nov 12 04:13:49 2019 -0500 TINKERPOP-2076 Bump to spark 3.0 with jdk11 support --- CHANGELOG.asciidoc | 3 +- gremlin-groovy/pom.xml | 2 +- pom.xml | 2 +- spark-gremlin/pom.xml | 254 ++++----------------- .../spark/process/computer/MemoryAccumulator.java | 47 ++-- .../spark/process/computer/SparkMemory.java | 16 +- 6 files changed, 93 insertions(+), 231 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 9f3b9bc..9abe36f 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -47,7 +47,8 @@ This release also includes changes from <<release-3-4-3, 3.4.3>>. * Configured GraphBinary as the default binary serialization format for the Java Driver. * Configured GraphSON 3.0 as the default text serialization format when no serializer can be determined. * Configured GraphSON 3.0 as the default setting for the `GraphSONMapper`. -* Bump to Neo4j 3.4.11. +* Bumped to Neo4j 3.4.11. +* Bumped to Spark 3.0.0. * Added a parameterized `TypeTranslator` for use with `GroovyTranslator` that should produce more cache hits. * Added support for `TextP` in Neo4j using its string search functions. * Changed `TraversalStrategy` application methodology to apply each strategy in turn to each level of the traversal hierarchy starting from root down to children. diff --git a/gremlin-groovy/pom.xml b/gremlin-groovy/pom.xml index c4373b4..b890a99 100644 --- a/gremlin-groovy/pom.xml +++ b/gremlin-groovy/pom.xml @@ -34,7 +34,7 @@ limitations under the License. <dependency> <groupId>org.apache.ivy</groupId> <artifactId>ivy</artifactId> - <version>2.3.0</version> + <version>2.4.0</version> </dependency> <dependency> <groupId>org.codehaus.groovy</groupId> diff --git a/pom.xml b/pom.xml index 61aa457..bf22855 100644 --- a/pom.xml +++ b/pom.xml @@ -159,7 +159,7 @@ limitations under the License. <netty.version>4.1.49.Final</netty.version> <slf4j.version>1.7.25</slf4j.version> <snakeyaml.version>1.15</snakeyaml.version> - <spark.version>2.4.0</spark.version> + <spark.version>3.0.0-preview</spark.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> diff --git a/spark-gremlin/pom.xml b/spark-gremlin/pom.xml index 695d743..0d7a64d 100644 --- a/spark-gremlin/pom.xml +++ b/spark-gremlin/pom.xml @@ -32,7 +32,7 @@ <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> - <version>14.0.1</version> + <version>16.0.1</version> </dependency> <dependency> <groupId>org.apache.tinkerpop</groupId> @@ -44,257 +44,105 @@ <artifactId>hadoop-gremlin</artifactId> <version>${project.version}</version> <exclusions> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>javax.servlet-api</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-core</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-server</artifactId> - </exclusion> - <exclusion> - <groupId>commons-net</groupId> - <artifactId>commons-net</artifactId> - </exclusion> - <exclusion> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> + <!-- use our snappy as there is conflict within spark--> <exclusion> <groupId>org.xerial.snappy</groupId> <artifactId>snappy-java</artifactId> </exclusion> + <!-- use spark's avro --> <exclusion> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> </exclusion> + <!-- use spark's math --> <exclusion> <groupId>org.apache.commons</groupId> <artifactId>commons-math3</artifactId> </exclusion> + <!-- use spark's netty 4--> <exclusion> <groupId>io.netty</groupId> - <artifactId>netty</artifactId> + <artifactId>netty-all</artifactId> </exclusion> + <!-- use spark's activation --> + <exclusion> + <groupId>javax.activation</groupId> + <artifactId>activation</artifactId> + </exclusion> + <!-- use zookeeper's netty 3 --> <exclusion> <groupId>io.netty</groupId> - <artifactId>netty-all</artifactId> + <artifactId>netty</artifactId> </exclusion> + <!-- use sparks commons-compress --> <exclusion> - <groupId>com.thoughtworks.paranamer</groupId> - <artifactId>paranamer</artifactId> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> </exclusion> </exclusions> </dependency> <!-- SPARK --> <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.11</artifactId> + <artifactId>spark-core_2.12</artifactId> <version>${spark.version}</version> <exclusions> - <!-- self conflicts --> - <exclusion> - <groupId>org.scala-lang</groupId> - <artifactId>scala-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - </exclusion> - <exclusion> - <groupId>org.scala-lang.modules</groupId> - <artifactId>scala-xml_2.11</artifactId> - </exclusion> - <exclusion> - <groupId>javax.activation</groupId> - <artifactId>activation</artifactId> - </exclusion> <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-mapper-asl</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-core-asl</artifactId> - </exclusion> - <exclusion> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - </exclusion> - <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> </exclusion> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </exclusion> <exclusion> - <groupId>org.scala-lang</groupId> - <artifactId>scala-reflect</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpcore</artifactId> - </exclusion> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-annotations</artifactId> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> </exclusion> <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - </exclusion> - <exclusion> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy</artifactId> </exclusion> <exclusion> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> </exclusion> <exclusion> <groupId>org.xerial.snappy</groupId> <artifactId>snappy-java</artifactId> </exclusion> - <exclusion> - <groupId>org.apache.curator</groupId> - <artifactId>curator-framework</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.curator</groupId> - <artifactId>curator-recipes</artifactId> - </exclusion> - <exclusion> - <groupId>com.thoughtworks.paranamer</groupId> - <artifactId>paranamer</artifactId> - </exclusion> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - <!-- gremlin-core conflicts --> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>jcl-over-slf4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.ivy</groupId> - <artifactId>ivy</artifactId> - </exclusion> - <!-- gremlin-groovy conflicts --> - <exclusion> - <groupId>jline</groupId> - <artifactId>jline</artifactId> - </exclusion> - <!-- hadoop conflicts --> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.curator</groupId> - <artifactId>curator-client</artifactId> - </exclusion> - <!-- lgpl conflicts --> - <exclusion> - <groupId>com.google.code.findbugs</groupId> - <artifactId>jsr305</artifactId> - </exclusion> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty-all</artifactId> - </exclusion> - <!-- avro conflicts --> - <exclusion> - <groupId>org.apache.commons</groupId> - <artifactId>commons-compress</artifactId> - </exclusion> </exclusions> </dependency> - <!-- consistent dependencies --> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>2.11.8</version> - </dependency> - <dependency> - <groupId>org.scala-lang.modules</groupId> - <artifactId>scala-xml_2.11</artifactId> - <version>1.0.5</version> - <exclusions> - <exclusion> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <version>2.6.7</version> - </dependency> - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - <version>${commons.lang.version}</version> - </dependency> - <dependency> - <groupId>com.thoughtworks.paranamer</groupId> - <artifactId>paranamer</artifactId> - <version>2.6</version> - </dependency> + <!-- spark self-conflict and hadoop conflict --> <dependency> <groupId>org.xerial.snappy</groupId> <artifactId>snappy-java</artifactId> - <version>1.1.1.7</version> + <version>1.1.7.3</version> </dependency> + <!-- spark self-conflict --> <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-all</artifactId> - <version>4.1.32.Final</version> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>2.12.10</version> </dependency> + <!-- spark self-confict --> <dependency> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - <version>3.9.9.Final</version> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.10.0</version> </dependency> + <!-- spark self-confict --> <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-compress</artifactId> - <version>1.19</version> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.4.6</version> + <exclusions> + <!-- use gremlin-groovy's jline --> + <exclusion> + <groupId>jline</groupId> + <artifactId>jline</artifactId> + </exclusion> + </exclusions> </dependency> <!-- TEST --> <dependency> @@ -302,16 +150,6 @@ <artifactId>gremlin-test</artifactId> <version>${project.version}</version> <scope>test</scope> - <exclusions> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - <exclusion> - <groupId>org.objenesis</groupId> - <artifactId>objenesis</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.tinkerpop</groupId> diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java index cf8cb25..cc7b8de 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/MemoryAccumulator.java @@ -19,37 +19,58 @@ package org.apache.tinkerpop.gremlin.spark.process.computer; -import org.apache.spark.AccumulatorParam; +import org.apache.spark.util.AccumulatorV2; import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable; import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey; /** * @author Marko A. Rodriguez (http://markorodriguez.com) + * @author Stephen Mallette (http://stephen.genoprime.com) */ -public final class MemoryAccumulator<A> implements AccumulatorParam<ObjectWritable<A>> { +public final class MemoryAccumulator<A> extends AccumulatorV2<ObjectWritable<A>, ObjectWritable<A>> { private final MemoryComputeKey<A> memoryComputeKey; + private ObjectWritable<A> value; - public MemoryAccumulator(final MemoryComputeKey<A> memoryComputeKey) { + MemoryAccumulator(final MemoryComputeKey<A> memoryComputeKey) { + this(memoryComputeKey, ObjectWritable.empty()); + } + + private MemoryAccumulator(final MemoryComputeKey<A> memoryComputeKey, final ObjectWritable<A> initial) { this.memoryComputeKey = memoryComputeKey; + this.value = initial; + } + + @Override + public boolean isZero() { + return ObjectWritable.empty().equals(value); + } + + @Override + public AccumulatorV2<ObjectWritable<A>, ObjectWritable<A>> copy() { + return new MemoryAccumulator<>(this.memoryComputeKey, this.value); + } + + @Override + public void reset() { + this.value = ObjectWritable.empty(); } @Override - public ObjectWritable<A> addAccumulator(final ObjectWritable<A> a, final ObjectWritable<A> b) { - if (a.isEmpty()) - return b; - if (b.isEmpty()) - return a; - return new ObjectWritable<>(this.memoryComputeKey.getReducer().apply(a.get(), b.get())); + public void add(final ObjectWritable<A> v) { + if (this.value.isEmpty()) + this.value = v; + if (!v.isEmpty()) + this.value = new ObjectWritable<>(this.memoryComputeKey.getReducer().apply(value.get(), v.get())); } @Override - public ObjectWritable<A> addInPlace(final ObjectWritable<A> a, final ObjectWritable<A> b) { - return this.addAccumulator(a, b); + public void merge(final AccumulatorV2<ObjectWritable<A>, ObjectWritable<A>> other) { + this.add(other.value()); } @Override - public ObjectWritable<A> zero(final ObjectWritable<A> a) { - return ObjectWritable.empty(); + public ObjectWritable<A> value() { + return this.value; } } diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java index bf8590e..5a04162 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java @@ -18,7 +18,7 @@ */ package org.apache.tinkerpop.gremlin.spark.process.computer; -import org.apache.spark.Accumulator; +import org.apache.spark.util.AccumulatorV2; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable; @@ -46,7 +46,7 @@ import java.util.concurrent.atomic.AtomicLong; public final class SparkMemory implements Memory.Admin, Serializable { public final Map<String, MemoryComputeKey> memoryComputeKeys = new HashMap<>(); - private final Map<String, Accumulator<ObjectWritable>> sparkMemory = new HashMap<>(); + private final Map<String, AccumulatorV2<ObjectWritable,ObjectWritable>> sparkMemory = new HashMap<>(); private final AtomicInteger iteration = new AtomicInteger(0); private final AtomicLong runtime = new AtomicLong(0l); private Broadcast<Map<String, Object>> broadcast; @@ -62,9 +62,9 @@ public final class SparkMemory implements Memory.Admin, Serializable { this.memoryComputeKeys.put(mapReduce.getMemoryKey(), MemoryComputeKey.of(mapReduce.getMemoryKey(), Operator.assign, false, false)); } for (final MemoryComputeKey memoryComputeKey : this.memoryComputeKeys.values()) { - this.sparkMemory.put( - memoryComputeKey.getKey(), - sparkContext.accumulator(ObjectWritable.empty(), memoryComputeKey.getKey(), new MemoryAccumulator<>(memoryComputeKey))); + final AccumulatorV2<ObjectWritable, ObjectWritable> accumulator = new MemoryAccumulator<>(memoryComputeKey); + JavaSparkContext.toSparkContext(sparkContext).register(accumulator, memoryComputeKey.getKey()); + this.sparkMemory.put(memoryComputeKey.getKey(), accumulator); } this.broadcast = sparkContext.broadcast(Collections.emptyMap()); } @@ -135,8 +135,10 @@ public final class SparkMemory implements Memory.Admin, Serializable { checkKeyValue(key, value); if (this.inExecute) throw Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate(key); - else - this.sparkMemory.get(key).setValue(new ObjectWritable<>(value)); + else { + this.sparkMemory.get(key).reset(); + this.sparkMemory.get(key).add(new ObjectWritable<>(value)); + } } @Override