Repository: tinkerpop Updated Branches: refs/heads/TINKERPOP-1389 532ed59c2 -> 49924cc66
KryoShimService is no longer a META-INF service with priorities and all that. Instead, it is now a configuration property. If the user provided configuration DOES NOT have gremlin.io.kryoShimService, then it defaults to HadoopPoolsShimService. However, for SparkGraphComputer, if KryoSerializer is being used, then the kryoShimService is hard-set to UnshadedKryoService (which is a Spark specific shim). This seems much cleaner and less error prone. Will need @dalaro to 'okay' the model. Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/49924cc6 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/49924cc6 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/49924cc6 Branch: refs/heads/TINKERPOP-1389 Commit: 49924cc6697f255a553dac8b3b896b10e00e3211 Parents: 532ed59 Author: Marko A. Rodriguez <[email protected]> Authored: Tue Oct 25 09:01:44 2016 -0600 Committer: Marko A. Rodriguez <[email protected]> Committed: Tue Oct 25 09:01:44 2016 -0600 ---------------------------------------------------------------------- .../io/gryo/kryoshim/KryoShimService.java | 27 +--- .../io/gryo/kryoshim/KryoShimServiceLoader.java | 140 +++---------------- .../gremlin/hadoop/structure/HadoopGraph.java | 7 +- .../structure/io/HadoopPoolShimService.java | 5 - ...n.structure.io.gryo.kryoshim.KryoShimService | 1 - .../process/computer/SparkGraphComputer.java | 26 ++-- .../unshaded/UnshadedKryoShimService.java | 13 +- ...n.structure.io.gryo.kryoshim.KryoShimService | 1 - .../SparkHadoopGraphGryoSerializerProvider.java | 6 +- .../computer/SparkHadoopGraphProvider.java | 6 - .../spark/structure/io/ToyGraphInputRDD.java | 1 - 11 files changed, 37 insertions(+), 196 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java index b8880a4..f8abd4e 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java @@ -57,38 +57,13 @@ public interface KryoShimService { /** * Serializes an object to an output stream. This may flush the output stream. * - * @param o the object to serialize + * @param o the object to serialize * @param sink the stream into which the serialized object is written */ public void writeClassAndObject(final Object o, final OutputStream sink); /** - * Returns this service's relative priority number. Unless explicitly overridden through a - * system property ({@link KryoShimServiceLoader#KRYO_SHIM_SERVICE}), - * the service implementation with the numerically highest priority will be used - * and all others ignored. In other words, the highest priority wins (in the absence of a - * system property override). - * <p> - * TinkerPop's current default implementation uses priority value zero. - * <p> - * Third-party implementations of this interface should (but are not technically required) - * to use a priority value with absolute value greater than 100. - * <p> - * The implementation currently breaks priority ties by lexicographical comparison of - * fully-qualified package-and-classname, but this tie-breaking behavior should be - * considered undefined and subject to future change. Ties are ignored if the service - * is explicitly set through the system property mentioned above. - * - * @return this implementation's priority value - */ - public int getPriority(); - - /** * Attempt to incorporate the supplied configuration in future read/write calls. - * <p> - * This method is a wart that exists essentially just to support the old - * {@link HadoopPools#initialize(Configuration)} use-case. - * <p> * This method is not guaranteed to have any effect on an instance of this interface * after {@link #writeClassAndObject(Object, OutputStream)} or {@link #readClassAndObject(InputStream)} * has been invoked on that particular instance. http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java index c026130..ac815b1 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java @@ -24,9 +24,6 @@ import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.ServiceLoader; /** @@ -36,19 +33,18 @@ public class KryoShimServiceLoader { private static volatile KryoShimService cachedShimService; - private static volatile Configuration conf; + private static volatile Configuration configuration; private static final Logger log = LoggerFactory.getLogger(KryoShimServiceLoader.class); /** * Set this system property to the fully-qualified name of a {@link KryoShimService} - * package-and-classname to force it into service. Setting this property causes the - * priority-selection mechanism ({@link KryoShimService#getPriority()}) to be ignored. + * package-and-classname to force it into service. */ public static final String KRYO_SHIM_SERVICE = "gremlin.io.kryoShimService"; - public static void applyConfiguration(final Configuration conf) { - KryoShimServiceLoader.conf = conf; + public static void applyConfiguration(final Configuration configuration) { + KryoShimServiceLoader.configuration = configuration; load(true); } @@ -63,71 +59,23 @@ public class KryoShimServiceLoader { * @return the shim service */ public static KryoShimService load(final boolean forceReload) { - - if (null != cachedShimService && !forceReload) { + if (null != cachedShimService && !forceReload) return cachedShimService; - } - - final ArrayList<KryoShimService> services = new ArrayList<>(); - - final ServiceLoader<KryoShimService> sl = ServiceLoader.load(KryoShimService.class); - - KryoShimService result = null; - - synchronized (KryoShimServiceLoader.class) { - if (forceReload) { - sl.reload(); - } - - for (KryoShimService kss : sl) { - services.add(kss); - } - } - - String shimClass = System.getProperty(KRYO_SHIM_SERVICE); - - if (null != shimClass) { - for (KryoShimService kss : services) { - if (kss.getClass().getCanonicalName().equals(shimClass)) { - log.info("Set {} provider to {} ({}) from system property {}={}", - KryoShimService.class.getSimpleName(), kss, kss.getClass(), - KRYO_SHIM_SERVICE, shimClass); - result = kss; - } - } - } else { - Collections.sort(services, KryoShimServiceComparator.INSTANCE); - - for (KryoShimService kss : services) { - log.debug("Found Kryo shim service class {} (priority {})", kss.getClass(), kss.getPriority()); - } - - if (0 != services.size()) { - result = services.get(services.size() - 1); - - log.info("Set {} provider to {} ({}) because its priority value ({}) is the best available", - KryoShimService.class.getSimpleName(), result, result.getClass(), result.getPriority()); - } - } - - - if (null == result) { - throw new IllegalStateException("Unable to load KryoShimService"); - } + if (!configuration.containsKey(KRYO_SHIM_SERVICE)) + throw new IllegalArgumentException("The provided configuration does not contain a " + KRYO_SHIM_SERVICE + " property"); - final Configuration userConf = conf; - - if (null != userConf) { - log.info("Configuring {} provider {} with user-provided configuration", - KryoShimService.class.getSimpleName(), result); - result.applyConfiguration(userConf); + try { + cachedShimService = ((Class<? extends KryoShimService>) Class.forName(configuration.getString(KRYO_SHIM_SERVICE))).newInstance(); + cachedShimService.applyConfiguration(configuration); + log.info("Using the following KryoShimService: " + cachedShimService.getClass().getCanonicalName()); + return cachedShimService; + } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new IllegalStateException(e.getMessage(), e); } - - return cachedShimService = result; } /** - * Equivalent to {@link #load(boolean)} with the parameter {@code true}. + * Equivalent to {@link #load(boolean)} with the parameter {@code false}. * * @return the (possibly cached) shim service */ @@ -136,7 +84,7 @@ public class KryoShimServiceLoader { } /** - * A loose abstraction of {@link org.apache.tinkerpop.shaded.kryo.Kryo#writeClassAndObject(Output, Object)}, + * A loose abstraction of {@link org.apache.tinkerpop.shaded.kryo.Kryo#writeClassAndObject}, * where the {@code output} parameter is an internally-created {@link ByteArrayOutputStream}. Returns * the byte array underlying that stream. * @@ -144,17 +92,13 @@ public class KryoShimServiceLoader { * @return the serialized form */ public static byte[] writeClassAndObjectToBytes(final Object o) { - final KryoShimService shimService = load(); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - - shimService.writeClassAndObject(o, baos); - + load().writeClassAndObject(o, baos); return baos.toByteArray(); } /** - * A loose abstraction of {@link org.apache.tinkerpop.shaded.kryo.Kryo#readClassAndObject(Input)}, + * A loose abstraction of {@link org.apache.tinkerpop.shaded.kryo.Kryo#readClassAndObject}, * where the {@code input} parameter is {@code source}. Returns the deserialized object. * * @param source an input stream containing data for a serialized object class and instance @@ -162,52 +106,6 @@ public class KryoShimServiceLoader { * @return the deserialized object */ public static <T> T readClassAndObject(final InputStream source) { - final KryoShimService shimService = load(); - - return (T) shimService.readClassAndObject(source); - } - - /** - * Selects the service with greatest {@link KryoShimService#getPriority()} - * (not absolute value). - * <p> - * Breaks ties with lexicographical comparison of classnames where the - * name that sorts last is considered to have highest priority. Ideally - * nothing should rely on that tiebreaking behavior, but it beats random - * selection in case a user ever gets into that situation by accident and - * tries to figure out what's going on. - */ - private enum KryoShimServiceComparator implements Comparator<KryoShimService> { - INSTANCE; - - @Override - public int compare(final KryoShimService a, final KryoShimService b) { - final int ap = a.getPriority(); - final int bp = b.getPriority(); - - if (ap < bp) { - return -1; - } else if (bp < ap) { - return 1; - } else { - final int result = a.getClass().getCanonicalName().compareTo(b.getClass().getCanonicalName()); - - if (0 == result) { - log.warn("Found two {} implementations with the same canonical classname: {}. " + - "This may indicate a problem with the classpath/classloader such as " + - "duplicate or conflicting copies of the file " + - "META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService.", - a.getClass().getCanonicalName()); - } else { - final String winner = 0 < result ? a.getClass().getCanonicalName() : b.getClass().getCanonicalName(); - log.warn("{} implementations {} and {} are tied with priority value {}. " + - "Preferring {} to the other because it has a lexicographically greater classname. " + - "Consider setting the system property \"{}\" instead of relying on priority tie-breaking.", - KryoShimService.class.getSimpleName(), a, b, ap, winner, KRYO_SHIM_SERVICE); - } - - return result; - } - } + return (T) load().readClassAndObject(source); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java index d0f50d0..7043070 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java @@ -25,12 +25,14 @@ import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer; import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopEdgeIterator; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService; import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopVertexIterator; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Graph; import org.apache.tinkerpop.gremlin.structure.Transaction; import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import org.apache.tinkerpop.gremlin.structure.util.ElementHelper; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; @@ -109,7 +111,8 @@ import java.util.stream.Stream; test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyMatchTest$Traversals", method = "g_V_matchXa_0sungBy_b__a_0sungBy_c__b_writtenBy_d__c_writtenBy_e__d_hasXname_George_HarisonX__e_hasXname_Bob_MarleyXX", reason = "Hadoop-Gremlin is OLAP-oriented and for OLTP operations, linear-scan joins are required. This particular tests takes many minutes to execute.", - computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"}) // this is a nasty long test, just do it once in Java MatchTest + computers = {"org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer"}) +// this is a nasty long test, just do it once in Java MatchTest @Graph.OptOut( test = "org.apache.tinkerpop.gremlin.process.traversal.step.map.GroovyMatchTest$Traversals", method = "g_V_matchXa_0sungBy_b__a_0writtenBy_c__b_writtenBy_d__c_sungBy_d__d_hasXname_GarciaXX", @@ -262,6 +265,8 @@ public final class HadoopGraph implements Graph { private HadoopGraph(final Configuration configuration) { this.configuration = new HadoopConfiguration(configuration); + if (!this.configuration.containsKey(KryoShimServiceLoader.KRYO_SHIM_SERVICE)) + this.configuration.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, HadoopPoolShimService.class.getCanonicalName()); } public static HadoopGraph open() { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java index 3fad4fd..f2e80ae 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java @@ -64,11 +64,6 @@ public class HadoopPoolShimService implements KryoShimService { } @Override - public int getPriority() { - return 0; - } - - @Override public void applyConfiguration(final Configuration conf) { HadoopPools.initialize(conf); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService b/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService deleted file mode 100644 index 0b27e72..0000000 --- a/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService +++ /dev/null @@ -1 +0,0 @@ -org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService # HadoopPools provides/caches instances of TinkerPop's shaded Kryo http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java index c7d0cfb..42f2493 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java @@ -69,8 +69,6 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage; -import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator; -import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService; import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.io.Storage; @@ -97,7 +95,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { * An {@code ExecutorService} that schedules up background work. Since a {@link GraphComputer} is only used once * for a {@link VertexProgram} a single threaded executor is sufficient. */ - private final ExecutorService computerService = Executors.newSingleThreadExecutor(threadFactoryBoss); + private final ExecutorService computerService = Executors.newSingleThreadExecutor(this.threadFactoryBoss); static { TraversalStrategies.GlobalCache.registerStrategies(SparkGraphComputer.class, @@ -110,19 +108,11 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { super(hadoopGraph); this.sparkConfiguration = new HadoopConfiguration(); ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration); - if (KryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER, null)) && - GryoRegistrator.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_KRYO_REGISTRATOR, null))) { - System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, UnshadedKryoShimService.class.getCanonicalName()); - } else if (GryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER, null)) && - !this.sparkConfiguration.containsKey(Constants.SPARK_KRYO_REGISTRATOR)) { - System.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, HadoopPoolShimService.class.getCanonicalName()); - } - if (null != System.getProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, null)) { - final String shimService = System.getProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE); - this.sparkConfiguration.setProperty(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, - (this.sparkConfiguration.getString(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS, "") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim()); - this.sparkConfiguration.setProperty(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, - (this.sparkConfiguration.getString(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "") + " -D" + KryoShimServiceLoader.KRYO_SHIM_SERVICE + "=" + shimService).trim()); + if (HadoopPoolShimService.class.getCanonicalName().equals(this.sparkConfiguration.getString(KryoShimServiceLoader.KRYO_SHIM_SERVICE, HadoopPoolShimService.class.getCanonicalName()))) { + this.sparkConfiguration.setProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, + KryoSerializer.class.getCanonicalName().equals(this.sparkConfiguration.getString(Constants.SPARK_SERIALIZER, null)) ? + UnshadedKryoShimService.class.getCanonicalName() : + HadoopPoolShimService.class.getCanonicalName()); } } @@ -154,8 +144,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { final long startTime = System.currentTimeMillis(); // apache and hadoop configurations that are used throughout the graph computer computation final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration); - if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER)) - graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName()); + //if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER)) + // graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName()); graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES)); final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(graphComputerConfiguration); final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java index 0789d6a..5d963c9 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java @@ -44,14 +44,10 @@ import java.util.concurrent.LinkedBlockingQueue; public class UnshadedKryoShimService implements KryoShimService { private static final Logger log = LoggerFactory.getLogger(UnshadedKryoShimService.class); - private static final LinkedBlockingQueue<Kryo> KRYOS = new LinkedBlockingQueue<>(); - + private static final Configuration EMPTY_CONFIGURATION = new BaseConfiguration(); private static volatile boolean initialized; - public UnshadedKryoShimService() { - } - @Override public Object readClassAndObject(final InputStream source) { @@ -99,17 +95,12 @@ public class UnshadedKryoShimService implements KryoShimService { } @Override - public int getPriority() { - return -50; - } - - @Override public void applyConfiguration(final Configuration conf) { initialize(conf); } private LinkedBlockingQueue<Kryo> initialize() { - return initialize(new BaseConfiguration()); + return initialize(EMPTY_CONFIGURATION); } private LinkedBlockingQueue<Kryo> initialize(final Configuration conf) { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService b/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService deleted file mode 100644 index 68712a6..0000000 --- a/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService +++ /dev/null @@ -1 +0,0 @@ -org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService # Supports Spark http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java index 9820b7b..33b538b 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java @@ -21,10 +21,8 @@ package org.apache.tinkerpop.gremlin.spark.process.computer; import org.apache.tinkerpop.gremlin.LoadGraphWith; import org.apache.tinkerpop.gremlin.hadoop.Constants; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService; import org.apache.tinkerpop.gremlin.spark.structure.Spark; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; -import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import java.util.Map; @@ -34,9 +32,7 @@ import java.util.Map; public final class SparkHadoopGraphGryoSerializerProvider extends SparkHadoopGraphProvider { public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { - if (this.getClass().equals(SparkHadoopGraphGryoSerializerProvider.class) && - !HadoopPoolShimService.class.getCanonicalName().equals(System.getProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, null))) - Spark.close(); + Spark.close(); final Map<String, Object> config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith); config.put(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName()); config.remove(Constants.SPARK_KRYO_REGISTRATOR); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java index c5b5083..014381e 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java @@ -42,9 +42,7 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck; import org.apache.tinkerpop.gremlin.spark.structure.io.ToyGraphInputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator; -import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService; import org.apache.tinkerpop.gremlin.structure.Graph; -import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import java.util.Map; @@ -56,10 +54,6 @@ public class SparkHadoopGraphProvider extends HadoopGraphProvider { @Override public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { - if (this.getClass().equals(SparkHadoopGraphProvider.class) && - !UnshadedKryoShimService.class.getCanonicalName().equals(System.getProperty(KryoShimServiceLoader.KRYO_SHIM_SERVICE, null))) - Spark.close(); - final Map<String, Object> config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith); config.put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); // this makes the test suite go really fast http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/49924cc6/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java index 4cd8cea..d59b4e1 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java @@ -47,7 +47,6 @@ public final class ToyGraphInputRDD implements InputRDD { @Override public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) { - KryoShimServiceLoader.applyConfiguration(TinkerGraph.open().configuration()); final List<VertexWritable> vertices; if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("modern")) vertices = IteratorUtils.list(IteratorUtils.map(TinkerFactory.createModern().vertices(), VertexWritable::new));
