Repository: tinkerpop Updated Branches: refs/heads/TINKERPOP-1389 064216eef -> 8b57096f4
okay. so I found a bug that has to do with joins() in Spark and IoRegistry.... I thought this whole time it was from me -- but then I realized that I added a .out() to the test. I reverted to an older version of the branch and added a .out(). failed. GryoRegistrator is being used as the Kryo source, not IoAwareKryoSerializer. It happens in the shuffle 'threads' and skipts anything regarding KryoServiceLoader. I am so brain fried --- hopefully I realize the solution. For now, everything else is really good. Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/8b57096f Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/8b57096f Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/8b57096f Branch: refs/heads/TINKERPOP-1389 Commit: 8b57096f40f7fb085872cfc6b937e00cdf890de0 Parents: 064216e Author: Marko A. Rodriguez <[email protected]> Authored: Thu Oct 27 13:28:07 2016 -0600 Committer: Marko A. Rodriguez <[email protected]> Committed: Thu Oct 27 13:28:07 2016 -0600 ---------------------------------------------------------------------- .../giraph/GiraphGremlinIntegrateTest.java | 33 ++++++++ .../gremlin/giraph/GiraphGremlinTest.java | 33 -------- .../structure/io/GiraphIoRegistryCheck.java | 13 ++- .../gremlin/structure/io/IoRegistry.java | 3 + .../tinkerpop/gremlin/structure/io/Mapper.java | 19 +++++ .../gremlin/structure/io/gryo/GryoPool.java | 35 +------- .../structure/io/util/IoRegistryHelper.java | 84 ++++++++++++++++++++ .../gremlin/structure/io/gryo/GryoPoolTest.java | 25 +++--- .../hadoop/structure/io/HadoopPools.java | 3 +- .../io/graphson/GraphSONRecordReader.java | 13 ++- .../io/graphson/GraphSONRecordWriter.java | 12 ++- .../structure/io/gryo/GryoRecordReader.java | 12 ++- .../structure/io/gryo/GryoRecordWriter.java | 14 ++-- .../gremlin/hadoop/HadoopGraphProvider.java | 20 +++-- .../structure/io/AbstractIoRegistryCheck.java | 74 +++++++++-------- .../GraphSONRecordReaderWriterTest.java | 2 +- .../hadoop/structure/io/gryo/ToyIoRegistry.java | 30 +++++++ .../hadoop/structure/io/gryo/ToyPoint.java | 40 ++++++++++ .../hadoop/structure/io/gryo/ToyTriangle.java | 40 ++++++++++ .../spark/structure/io/gryo/GryoSerializer.java | 3 +- .../io/gryo/IoRegistryAwareKryoSerializer.java | 30 ++++--- .../unshaded/UnshadedKryoShimService.java | 1 + .../gremlin/spark/AbstractSparkTest.java | 2 + .../spark/SparkGremlinIntegrateTest.java | 33 ++++++++ .../gremlin/spark/SparkGremlinTest.java | 33 -------- .../computer/SparkHadoopGraphProvider.java | 8 ++ .../structure/io/SparkIoRegistryCheck.java | 7 +- 27 files changed, 432 insertions(+), 190 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/GiraphGremlinIntegrateTest.java ---------------------------------------------------------------------- diff --git a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/GiraphGremlinIntegrateTest.java b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/GiraphGremlinIntegrateTest.java new file mode 100644 index 0000000..955649c --- /dev/null +++ b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/GiraphGremlinIntegrateTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.giraph; + +import org.apache.tinkerpop.gremlin.GraphProviderClass; +import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphHadoopGraphProvider; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; +import org.junit.runner.RunWith; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +@RunWith(GiraphGremlinSuite.class) +@GraphProviderClass(provider = GiraphHadoopGraphProvider.class, graph = HadoopGraph.class) +public class GiraphGremlinIntegrateTest { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/GiraphGremlinTest.java ---------------------------------------------------------------------- diff --git a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/GiraphGremlinTest.java b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/GiraphGremlinTest.java deleted file mode 100644 index da638cf..0000000 --- a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/GiraphGremlinTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.giraph; - -import org.apache.tinkerpop.gremlin.GraphProviderClass; -import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphHadoopGraphProvider; -import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; -import org.junit.runner.RunWith; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -@RunWith(GiraphGremlinSuite.class) -@GraphProviderClass(provider = GiraphHadoopGraphProvider.class, graph = HadoopGraph.class) -public class GiraphGremlinTest { -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphIoRegistryCheck.java ---------------------------------------------------------------------- diff --git a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphIoRegistryCheck.java b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphIoRegistryCheck.java index 17b6bac..51a2712 100644 --- a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphIoRegistryCheck.java +++ b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/structure/io/GiraphIoRegistryCheck.java @@ -20,12 +20,10 @@ package org.apache.tinkerpop.gremlin.giraph.structure.io; import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphGraphComputer; -import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.hadoop.structure.io.AbstractIoRegistryCheck; import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -38,17 +36,24 @@ public class GiraphIoRegistryCheck extends AbstractIoRegistryCheck { @Before public void setup() throws Exception { super.setup(); + KryoShimServiceLoader.close(); HadoopPools.close(); } @After public void tearDown() throws Exception { super.tearDown(); + KryoShimServiceLoader.close(); HadoopPools.close(); } @Test - public void shouldSupportIoRegistry() throws Exception { + public void shouldSupportGryoIoRegistry() throws Exception { super.checkGryoIoRegistryCompliance((HadoopGraph) graph, GiraphGraphComputer.class); } + + @Test + public void shouldSupportGraphSONIoRegistry() throws Exception { + super.checkGraphSONIoRegistryCompliance((HadoopGraph) graph, GiraphGraphComputer.class); + } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/IoRegistry.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/IoRegistry.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/IoRegistry.java index a70e406..680a028 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/IoRegistry.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/IoRegistry.java @@ -48,6 +48,9 @@ import java.util.stream.Collectors; * @author Stephen Mallette (http://stephen.genoprime.com) */ public interface IoRegistry { + + public static final String IO_REGISTRY = "gremlin.io.registry"; + /** * Find a list of all the serializers registered to an {@link Io} class by the {@link Graph}. */ http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Mapper.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Mapper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Mapper.java index 4aeb3b7..08dd06d 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Mapper.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Mapper.java @@ -20,6 +20,8 @@ package org.apache.tinkerpop.gremlin.structure.io; import org.apache.tinkerpop.gremlin.structure.Graph; +import java.util.List; + /** * Represents a low-level serialization class that can be used to map classes to serializers. These implementation * create instances of serializers from other libraries (e.g. creating a {@code Kryo} instance). @@ -47,5 +49,22 @@ public interface Mapper<T> { * result in errors. */ public B addRegistry(final IoRegistry registry); + + /** + * Adds a vendor supplied {@link IoRegistry} to the {@code Mapper.Builder} which enables it to check for + * vendor custom serializers to add to the {@link Mapper}. All {@link Io} implementations should expose + * this method via this {@link Builder} so that it is compatible with {@link Graph#io}. Successive calls + * to this method will add multiple registries. Registry order must be respected when doing so. In + * other words, data written with {@link IoRegistry} {@code A} added first and {@code B} second must be read + * by a {@code Mapper} with that same registry ordering. Attempting to add {@code B} before {@code A} will + * result in errors. + */ + public default B addRegistries(final List<IoRegistry> registries) { + B b = (B) this; + for (final IoRegistry registry : registries) { + b = this.addRegistry(registry); + } + return b; + } } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java index 59f8a5d..5fc15a3 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java @@ -19,9 +19,9 @@ package org.apache.tinkerpop.gremlin.structure.io.gryo; import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; +import org.apache.tinkerpop.gremlin.structure.io.util.IoRegistryHelper; import org.apache.tinkerpop.shaded.kryo.Kryo; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -38,7 +38,7 @@ import java.util.function.Function; * @author Stephen Mallette (http://stephen.genoprime.com) */ public final class GryoPool { - public static final String CONFIG_IO_REGISTRY = "gremlin.io.registry"; + public static final String CONFIG_IO_GRYO_POOL_SIZE = "gremlin.io.gryo.poolSize"; public static final int CONFIG_IO_GRYO_POOL_SIZE_DEFAULT = 256; @@ -153,7 +153,7 @@ public final class GryoPool { * @return the update builder */ public Builder ioRegistries(final List<Object> ioRegistryClassNames) { - this.ioRegistries.addAll(tryCreateIoRegistry(ioRegistryClassNames)); + this.ioRegistries.addAll(IoRegistryHelper.createRegistries(ioRegistryClassNames)); return this; } @@ -164,7 +164,7 @@ public final class GryoPool { * @return the update builder */ public Builder ioRegistry(final Object ioRegistryClassName) { - this.ioRegistries.addAll(tryCreateIoRegistry(Collections.singletonList(ioRegistryClassName))); + this.ioRegistries.addAll(IoRegistryHelper.createRegistries(Collections.singletonList(ioRegistryClassName))); return this; } @@ -216,32 +216,5 @@ public final class GryoPool { gryoPool.createPool(this.poolSize, this.type, mapper.create()); return gryoPool; } - - ///// - - private static List<IoRegistry> tryCreateIoRegistry(final List<Object> classNames) { - if (classNames.isEmpty()) return Collections.emptyList(); - - final List<IoRegistry> registries = new ArrayList<>(); - classNames.forEach(c -> { - try { - final String className = c.toString(); - final Class<?> clazz = Class.forName(className); - try { - final Method instanceMethod = clazz.getDeclaredMethod("getInstance"); - if (IoRegistry.class.isAssignableFrom(instanceMethod.getReturnType())) - registries.add((IoRegistry) instanceMethod.invoke(null)); - else - throw new Exception(); - } catch (Exception methodex) { - // tried getInstance() and that failed so try newInstance() no-arg constructor - registries.add((IoRegistry) clazz.newInstance()); - } - } catch (Exception ex) { - throw new IllegalStateException(ex); - } - }); - return registries; - } } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/util/IoRegistryHelper.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/util/IoRegistryHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/util/IoRegistryHelper.java new file mode 100644 index 0000000..d1fac40 --- /dev/null +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/util/IoRegistryHelper.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.structure.io.util; + +import org.apache.commons.configuration.Configuration; +import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class IoRegistryHelper { + + private IoRegistryHelper() { + + } + + public static List<IoRegistry> createRegistries(final List<Object> registryNamesClassesOrInstances) { + if (registryNamesClassesOrInstances.isEmpty()) return Collections.emptyList(); + + final List<IoRegistry> registries = new ArrayList<>(); + for (final Object object : registryNamesClassesOrInstances) { + if (object instanceof IoRegistry) + registries.add((IoRegistry) object); + else if (object instanceof String || object instanceof Class) { + try { + final Class<?> clazz = object instanceof String ? Class.forName((String) object) : (Class) object; + try { + final Method instanceMethod = clazz.getDeclaredMethod("getInstance"); + if (IoRegistry.class.isAssignableFrom(instanceMethod.getReturnType())) + registries.add((IoRegistry) instanceMethod.invoke(null)); + else + throw new Exception(); + } catch (final Exception methodex) { + // tried getInstance() and that failed so try newInstance() no-arg constructor + registries.add((IoRegistry) clazz.newInstance()); + } + } catch (final Exception ex) { + throw new IllegalStateException(ex.getMessage(), ex); + } + } else { + throw new IllegalArgumentException("The provided registry object can not be resolved to an instance: " + object); + } + } + return registries; + } + + public static List<IoRegistry> createRegistries(final Configuration configuration) { + if (configuration.containsKey(IoRegistry.IO_REGISTRY)) { + final Object property = configuration.getProperty(IoRegistry.IO_REGISTRY); + if (property instanceof IoRegistry) + return Collections.singletonList((IoRegistry) property); + else if (property instanceof List) + return createRegistries((List) property); + else if (property instanceof String) + return createRegistries(Arrays.asList(((String) property).split(","))); + else + throw new IllegalArgumentException("The provided registry object can not be resolved to an instance: " + property); + } else + return Collections.emptyList(); + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java index 9db1ba4..f9029bb 100644 --- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java +++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPoolTest.java @@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.structure.io.gryo; import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; +import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; import org.apache.tinkerpop.gremlin.structure.io.IoX; import org.apache.tinkerpop.gremlin.structure.io.IoXIoRegistry; import org.apache.tinkerpop.gremlin.structure.io.IoY; @@ -41,7 +42,7 @@ public class GryoPoolTest { @Test public void shouldDoWithReaderWriterMethods() throws Exception { final Configuration conf = new BaseConfiguration(); - final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create(); + final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(IoRegistry.IO_REGISTRY, Collections.emptyList())).create(); try (final ByteArrayOutputStream os = new ByteArrayOutputStream()) { pool.doWithWriter(writer -> writer.writeObject(os, 1)); os.flush(); @@ -55,14 +56,14 @@ public class GryoPoolTest { @Test public void shouldConfigPoolOnConstructionWithDefaults() throws Exception { final Configuration conf = new BaseConfiguration(); - final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create(); + final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(IoRegistry.IO_REGISTRY, Collections.emptyList())).create(); assertReaderWriter(pool.takeWriter(), pool.takeReader(), 1, Integer.class); } @Test public void shouldConfigPoolOnConstructionWithPoolSizeOneAndNoIoRegistry() throws Exception { final Configuration conf = new BaseConfiguration(); - final GryoPool pool = GryoPool.build().poolSize(1).ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create(); + final GryoPool pool = GryoPool.build().poolSize(1).ioRegistries(conf.getList(IoRegistry.IO_REGISTRY, Collections.emptyList())).create(); final GryoReader reader = pool.takeReader(); final GryoWriter writer = pool.takeWriter(); @@ -86,25 +87,25 @@ public class GryoPoolTest { @Test public void shouldConfigPoolOnConstructionWithCustomIoRegistryConstructor() throws Exception { final Configuration conf = new BaseConfiguration(); - conf.setProperty(GryoPool.CONFIG_IO_REGISTRY, IoXIoRegistry.ConstructorBased.class.getName()); - final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create(); + conf.setProperty(IoRegistry.IO_REGISTRY, IoXIoRegistry.ConstructorBased.class.getName()); + final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(IoRegistry.IO_REGISTRY, Collections.emptyList())).create(); assertReaderWriter(pool.takeWriter(), pool.takeReader(), new IoX("test"), IoX.class); } @Test public void shouldConfigPoolOnConstructionWithCustomIoRegistryInstance() throws Exception { final Configuration conf = new BaseConfiguration(); - conf.setProperty(GryoPool.CONFIG_IO_REGISTRY, IoXIoRegistry.InstanceBased.class.getName()); - final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create(); + conf.setProperty(IoRegistry.IO_REGISTRY, IoXIoRegistry.InstanceBased.class.getName()); + final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(IoRegistry.IO_REGISTRY, Collections.emptyList())).create(); assertReaderWriter(pool.takeWriter(), pool.takeReader(), new IoX("test"), IoX.class); } @Test public void shouldConfigPoolOnConstructionWithMultipleCustomIoRegistries() throws Exception { final Configuration conf = new BaseConfiguration(); - conf.setProperty(GryoPool.CONFIG_IO_REGISTRY, + conf.setProperty(IoRegistry.IO_REGISTRY, IoXIoRegistry.InstanceBased.class.getName() + "," + IoYIoRegistry.InstanceBased.class.getName()); - final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create(); + final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(IoRegistry.IO_REGISTRY, Collections.emptyList())).create(); assertReaderWriter(pool.takeWriter(), pool.takeReader(), new IoX("test"), IoX.class); assertReaderWriter(pool.takeWriter(), pool.takeReader(), new IoY(100, 200), IoY.class); } @@ -112,15 +113,15 @@ public class GryoPoolTest { @Test(expected = IllegalArgumentException.class) public void shouldConfigPoolOnConstructionWithoutCustomIoRegistryAndFail() throws Exception { final Configuration conf = new BaseConfiguration(); - final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create(); + final GryoPool pool = GryoPool.build().ioRegistries(conf.getList(IoRegistry.IO_REGISTRY, Collections.emptyList())).create(); assertReaderWriter(pool.takeWriter(), pool.takeReader(), new IoX("test"), IoX.class); } @Test(expected = IllegalStateException.class) public void shouldConfigPoolOnConstructionWithoutBadIoRegistryAndFail() throws Exception { final Configuration conf = new BaseConfiguration(); - conf.setProperty(GryoPool.CONFIG_IO_REGISTRY, "some.class.that.does.not.exist"); - GryoPool.build().ioRegistries(conf.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())).create(); + conf.setProperty(IoRegistry.IO_REGISTRY, "some.class.that.does.not.exist"); + GryoPool.build().ioRegistries(conf.getList(IoRegistry.IO_REGISTRY, Collections.emptyList())).create(); } private static <T> void assertReaderWriter(final GryoWriter writer, final GryoReader reader, final T o, http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java index e652509..43e4f29 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java @@ -22,6 +22,7 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationUtils; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; +import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool; import org.apache.tinkerpop.gremlin.util.SystemUtil; @@ -43,7 +44,7 @@ public final class HadoopPools { INITIALIZED = true; GRYO_POOL = GryoPool.build(). poolSize(configuration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 256)). - ioRegistries(configuration.getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())). + ioRegistries(configuration.getList(IoRegistry.IO_REGISTRY, Collections.emptyList())). initializeMapper(m -> m.registrationRequired(false)). create(); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java index 828c85d..07bd303 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReader.java @@ -25,8 +25,13 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; import org.apache.tinkerpop.gremlin.structure.Direction; +import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper; import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONReader; +import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONVersion; +import org.apache.tinkerpop.gremlin.structure.io.graphson.TypeInfo; +import org.apache.tinkerpop.gremlin.structure.io.util.IoRegistryHelper; import org.apache.tinkerpop.gremlin.structure.util.Attachable; import java.io.ByteArrayInputStream; @@ -38,7 +43,7 @@ import java.io.InputStream; */ public final class GraphSONRecordReader extends RecordReader<NullWritable, VertexWritable> { - private final GraphSONReader graphsonReader = GraphSONReader.build().create(); + private GraphSONReader graphsonReader; private final VertexWritable vertexWritable = new VertexWritable(); private final LineRecordReader lineRecordReader; private boolean hasEdges; @@ -51,6 +56,11 @@ public final class GraphSONRecordReader extends RecordReader<NullWritable, Verte public void initialize(final InputSplit genericSplit, final TaskAttemptContext context) throws IOException { this.lineRecordReader.initialize(genericSplit, context); this.hasEdges = context.getConfiguration().getBoolean(Constants.GREMLIN_HADOOP_GRAPH_READER_HAS_EDGES, true); + this.graphsonReader = GraphSONReader.build().mapper( + GraphSONMapper.build(). + version(GraphSONVersion.V2_0). + typeInfo(TypeInfo.PARTIAL_TYPES). + addRegistries(IoRegistryHelper.createRegistries(ConfUtil.makeApacheConfiguration(context.getConfiguration()))).create()).create(); } @Override @@ -84,5 +94,6 @@ public final class GraphSONRecordReader extends RecordReader<NullWritable, Verte @Override public synchronized void close() throws IOException { this.lineRecordReader.close(); + this.graphsonReader = null; } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordWriter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordWriter.java index b558894..5ea058f 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordWriter.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordWriter.java @@ -24,8 +24,13 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; import org.apache.tinkerpop.gremlin.structure.Direction; +import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper; +import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONVersion; import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONWriter; +import org.apache.tinkerpop.gremlin.structure.io.graphson.TypeInfo; +import org.apache.tinkerpop.gremlin.structure.io.util.IoRegistryHelper; import java.io.DataOutputStream; import java.io.IOException; @@ -39,7 +44,7 @@ public final class GraphSONRecordWriter extends RecordWriter<NullWritable, Verte private static final byte[] NEWLINE; private final DataOutputStream outputStream; private final boolean hasEdges; - private final GraphSONWriter graphsonWriter = GraphSONWriter.build().create(); + private final GraphSONWriter graphsonWriter; static { @@ -53,6 +58,11 @@ public final class GraphSONRecordWriter extends RecordWriter<NullWritable, Verte public GraphSONRecordWriter(final DataOutputStream outputStream, final Configuration configuration) { this.outputStream = outputStream; this.hasEdges = configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, true); + this.graphsonWriter = GraphSONWriter.build().mapper( + GraphSONMapper.build(). + version(GraphSONVersion.V2_0). + typeInfo(TypeInfo.PARTIAL_TYPES). + addRegistries(IoRegistryHelper.createRegistries(ConfUtil.makeApacheConfiguration(configuration))).create()).create(); } @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java index d7ed46b..8b0a94d 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java @@ -28,15 +28,16 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.tinkerpop.gremlin.hadoop.Constants; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools; import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; import org.apache.tinkerpop.gremlin.process.computer.GraphFilter; import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper; import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader; import org.apache.tinkerpop.gremlin.structure.io.gryo.VertexTerminator; +import org.apache.tinkerpop.gremlin.structure.io.util.IoRegistryHelper; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -71,8 +72,8 @@ public final class GryoRecordReader extends RecordReader<NullWritable, VertexWri final Configuration configuration = context.getConfiguration(); if (configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null) this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_GRAPH_FILTER); - HadoopPools.initialize(configuration); - this.gryoReader = HadoopPools.getGryoPool().takeReader(); + this.gryoReader = GryoReader.build().mapper( + GryoMapper.build().addRegistries(IoRegistryHelper.createRegistries(ConfUtil.makeApacheConfiguration(configuration))).create()).create(); long start = split.getStart(); final Path file = split.getPath(); if (null != new CompressionCodecFactory(configuration).getCodec(file)) { @@ -165,9 +166,6 @@ public final class GryoRecordReader extends RecordReader<NullWritable, VertexWri @Override public synchronized void close() throws IOException { this.inputStream.close(); - if (null != this.gryoReader) { - HadoopPools.getGryoPool().offerReader(this.gryoReader); - this.gryoReader = null; - } + this.gryoReader = null; } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java index 67a8339..e5ee90e 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java @@ -23,10 +23,13 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.tinkerpop.gremlin.hadoop.Constants; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools; import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; import org.apache.tinkerpop.gremlin.structure.Direction; +import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter; +import org.apache.tinkerpop.gremlin.structure.io.util.IoRegistryHelper; import java.io.DataOutputStream; import java.io.IOException; @@ -43,8 +46,8 @@ public final class GryoRecordWriter extends RecordWriter<NullWritable, VertexWri public GryoRecordWriter(final DataOutputStream outputStream, final Configuration configuration) { this.outputStream = outputStream; this.hasEdges = configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, true); - HadoopPools.initialize(configuration); - this.gryoWriter = HadoopPools.getGryoPool().takeWriter(); + this.gryoWriter = GryoWriter.build().mapper( + GryoMapper.build().addRegistries(IoRegistryHelper.createRegistries(ConfUtil.makeApacheConfiguration(configuration))).create()).create(); } @Override @@ -60,9 +63,6 @@ public final class GryoRecordWriter extends RecordWriter<NullWritable, VertexWri @Override public synchronized void close(final TaskAttemptContext context) throws IOException { this.outputStream.close(); - if (null != this.gryoWriter) { - HadoopPools.getGryoPool().offerWriter(this.gryoWriter); - this.gryoWriter = null; - } + this.gryoWriter = null; } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java index c95ede5..0834bb5 100644 --- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java +++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java @@ -45,8 +45,6 @@ import java.util.Map; import java.util.Random; import java.util.Set; -import static org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader.KRYO_SHIM_SERVICE; - /** * @author Marko A. Rodriguez (http://markorodriguez.com) * @author Stephen Mallette (http://stephen.genoprime.com) @@ -86,10 +84,10 @@ public class HadoopGraphProvider extends AbstractGraphProvider { } final List<String> graphsonResources = Arrays.asList( - "tinkerpop-modern.json", - "grateful-dead.json", - "tinkerpop-classic.json", - "tinkerpop-crew.json"); + "tinkerpop-modern-v2d0-typed.json", + "grateful-dead-v2d0-typed.json", + "tinkerpop-classic-v2d0-typed.json", + "tinkerpop-crew-v2d0-typed.json"); for (final String fileName : graphsonResources) { PATHS.put(fileName, TestHelper.generateTempFileFromResource(GraphSONResourceAccess.class, fileName, "").getAbsolutePath().replace('\\', '/')); } @@ -138,16 +136,16 @@ public class HadoopGraphProvider extends AbstractGraphProvider { } public void loadGraphDataViaHadoopConfig(final Graph g, final LoadGraphWith.GraphData graphData) { - final String type = this.graphSONInput ? "json" : "kryo"; + final String type = this.graphSONInput ? "-v2d0-typed.json" : ".kryo"; if (graphData.equals(LoadGraphWith.GraphData.GRATEFUL)) { - ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("grateful-dead." + type)); + ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("grateful-dead" + type)); } else if (graphData.equals(LoadGraphWith.GraphData.MODERN)) { - ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("tinkerpop-modern." + type)); + ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("tinkerpop-modern" + type)); } else if (graphData.equals(LoadGraphWith.GraphData.CLASSIC)) { - ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("tinkerpop-classic." + type)); + ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("tinkerpop-classic" + type)); } else if (graphData.equals(LoadGraphWith.GraphData.CREW)) { - ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("tinkerpop-crew." + type)); + ((HadoopGraph) g).configuration().setInputLocation(PATHS.get("tinkerpop-crew" + type)); } else { throw new RuntimeException("Could not load graph with " + graphData); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java index 9002d57..06ff5bf 100644 --- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java +++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java @@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; @@ -29,6 +28,9 @@ import org.apache.tinkerpop.gremlin.AbstractGremlinTest; import org.apache.tinkerpop.gremlin.TestHelper; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONRecordWriter; import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat; import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat; import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordWriter; @@ -39,14 +41,12 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.structure.T; import org.apache.tinkerpop.gremlin.structure.Vertex; -import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool; +import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph; -import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -59,52 +59,62 @@ import static org.junit.Assert.assertTrue; */ public abstract class AbstractIoRegistryCheck extends AbstractGremlinTest { + private static final int NUMBER_OF_VERTICES = 1000; + public void checkGryoIoRegistryCompliance(final HadoopGraph graph, final Class<? extends GraphComputer> graphComputerClass) throws Exception { final File input = TestHelper.generateTempFile(this.getClass(), "gryo-io-registry", ".kryo"); graph.configuration().setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); graph.configuration().setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName()); graph.configuration().setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, input.getAbsolutePath()); - graph.configuration().setProperty(GryoPool.CONFIG_IO_REGISTRY, ToyIoRegistry.class.getCanonicalName()); + graph.configuration().setProperty(IoRegistry.IO_REGISTRY, ToyIoRegistry.class.getCanonicalName()); final GryoRecordWriter writer = new GryoRecordWriter(new DataOutputStream(new FileOutputStream(input)), ConfUtil.makeHadoopConfiguration(graph.configuration())); - validateIoRegistryGraph(graph, graphComputerClass, writer, GryoInputFormat.class); + validateIoRegistryGraph(graph, graphComputerClass, writer); + assertTrue(input.delete()); + } + + public void checkGraphSONIoRegistryCompliance(final HadoopGraph graph, final Class<? extends GraphComputer> graphComputerClass) throws Exception { + final File input = TestHelper.generateTempFile(this.getClass(), "graphson-io-registry", ".json"); + graph.configuration().setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GraphSONInputFormat.class.getCanonicalName()); + graph.configuration().setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GraphSONOutputFormat.class.getCanonicalName()); + graph.configuration().setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, input.getAbsolutePath()); + graph.configuration().setProperty(IoRegistry.IO_REGISTRY, ToyIoRegistry.class.getCanonicalName()); + final GraphSONRecordWriter writer = new GraphSONRecordWriter(new DataOutputStream(new FileOutputStream(input)), ConfUtil.makeHadoopConfiguration(graph.configuration())); + validateIoRegistryGraph(graph, graphComputerClass, writer); assertTrue(input.delete()); } private void validateIoRegistryGraph(final HadoopGraph graph, final Class<? extends GraphComputer> graphComputerClass, - final RecordWriter<NullWritable, VertexWritable> writer, - final Class<? extends InputFormat<NullWritable, VertexWritable>> inputFormat) throws Exception { - for (int i = 0; i < 10; i++) { + final RecordWriter<NullWritable, VertexWritable> writer) throws Exception { + + + for (int i = 0; i < NUMBER_OF_VERTICES; i++) { final StarGraph starGraph = StarGraph.open(); - starGraph.addVertex(T.label, "place", T.id, i, "point", new ToyPoint(i, i * 10), "message", "I'm " + i, "triangle", new ToyTriangle(i, i * 10, i * 100)); + Vertex vertex = starGraph.addVertex(T.label, "place", T.id, i, "point", new ToyPoint(i, i * 10), "message", "I'm " + i, "triangle", new ToyTriangle(i, i * 10, i * 100)); + vertex.addEdge("connection", starGraph.addVertex(T.id, i > 0 ? i - 1 : NUMBER_OF_VERTICES - 1)); writer.write(NullWritable.get(), new VertexWritable(starGraph.getStarVertex())); } writer.close(new TaskAttemptContextImpl(ConfUtil.makeHadoopConfiguration(graph.configuration()), new TaskAttemptID())); + // OLAP TESTING // - final List<Map<String, Object>> values = graph.traversal().withComputer(graphComputerClass).V().valueMap("point", "triangle").toList(); - assertEquals(10, values.size()); - // System.out.println(values); - for (int i = 0; i < 10; i++) { - assertTrue(values.stream().map(m -> m.get("point")).flatMap(l -> ((List<ToyPoint>) l).stream()).collect(Collectors.toList()).contains(new ToyPoint(i, i * 10))); - assertTrue(values.stream().map(m -> m.get("triangle")).flatMap(l -> ((List<ToyTriangle>) l).stream()).collect(Collectors.toList()).contains(new ToyTriangle(i, i * 10, i * 100))); - } - values.clear(); + validatePointTriangles(graph.traversal().withComputer(graphComputerClass).V().project("point", "triangle").by("point").by("triangle").toList()); + validatePointTriangles(graph.traversal().withComputer(graphComputerClass).V().out().project("point", "triangle").by("point").by("triangle").toList()); + validatePointTriangles(graph.traversal().withComputer(graphComputerClass).V().out().out().project("point", "triangle").by("point").by("triangle").toList()); // OLTP TESTING // - graph.traversal().V().valueMap("point", "triangle").fill(values); - assertEquals(10, values.size()); - for (int i = 0; i < 10; i++) { - assertTrue(values.stream().map(m -> m.<List<ToyPoint>>get("point")).flatMap(l -> ((List<ToyPoint>) l).stream()).collect(Collectors.toList()).contains(new ToyPoint(i, i * 10))); - assertTrue(values.stream().map(m -> m.<List<ToyTriangle>>get("triangle")).flatMap(l -> ((List<ToyTriangle>) l).stream()).collect(Collectors.toList()).contains(new ToyTriangle(i, i * 10, i * 100))); - } - values.clear(); + validatePointTriangles(graph.traversal().V().project("point", "triangle").by("point").by("triangle").toList()); // HDFS TESTING // - final List<Vertex> list = IteratorUtils.asList(FileSystemStorage.open(ConfUtil.makeHadoopConfiguration(graph.configuration())).head(graph.configuration().getInputLocation(), inputFormat)); - list.forEach(v -> values.add(new HashMap<String, Object>() {{ - put("point", v.value("point")); - put("triangle", v.value("triangle")); - }})); - assertEquals(10, values.size()); - for (int i = 0; i < 10; i++) { + /*validatePointTriangles(IteratorUtils.<Map<String, Object>>asList(IteratorUtils.<Vertex, Map<String, Object>>map(FileSystemStorage.open(ConfUtil.makeHadoopConfiguration(graph.configuration())).head(graph.configuration().getInputLocation(), graph.configuration().getGraphReader()), + vertex -> { + return new HashMap<String, Object>() {{ + put("point", vertex.value("point")); + put("triangle", vertex.value("triangle")); + }}; + })));*/ + } + + private void validatePointTriangles(final List<Map<String, Object>> values) { + assertEquals(NUMBER_OF_VERTICES, values.size()); + for (int i = 0; i < NUMBER_OF_VERTICES; i++) { assertTrue(values.stream().map(m -> m.<ToyPoint>get("point")).collect(Collectors.toList()).contains(new ToyPoint(i, i * 10))); assertTrue(values.stream().map(m -> m.<ToyTriangle>get("triangle")).collect(Collectors.toList()).contains(new ToyTriangle(i, i * 10, i * 100))); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReaderWriterTest.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReaderWriterTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReaderWriterTest.java index 2424184..903bef5 100644 --- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReaderWriterTest.java +++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/graphson/GraphSONRecordReaderWriterTest.java @@ -32,7 +32,7 @@ public class GraphSONRecordReaderWriterTest extends RecordReaderWriterTest { @Override protected String getInputFilename() { - return "grateful-dead.json"; + return "grateful-dead-v2d0-typed.json"; } @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/ToyIoRegistry.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/ToyIoRegistry.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/ToyIoRegistry.java index 78094a1..515d213 100644 --- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/ToyIoRegistry.java +++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/ToyIoRegistry.java @@ -20,8 +20,13 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo; import org.apache.tinkerpop.gremlin.structure.io.AbstractIoRegistry; +import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONIo; +import org.apache.tinkerpop.gremlin.structure.io.graphson.TinkerPopJacksonModule; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo; +import java.util.HashMap; +import java.util.Map; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ @@ -32,6 +37,31 @@ public final class ToyIoRegistry extends AbstractIoRegistry { private ToyIoRegistry() { super.register(GryoIo.class, ToyPoint.class, new ToyPoint.ToyPointSerializer()); super.register(GryoIo.class, ToyTriangle.class, new ToyTriangle.ToyTriangleSerializer()); + super.register(GraphSONIo.class, null, new ToyModule()); + } + + public static class ToyModule extends TinkerPopJacksonModule { + public ToyModule() { + super("toy"); + addSerializer(ToyPoint.class, new ToyPoint.ToyPointJacksonSerializer()); + addDeserializer(ToyPoint.class, new ToyPoint.ToyPointJacksonDeSerializer()); + addSerializer(ToyTriangle.class, new ToyTriangle.ToyTriangleJacksonSerializer()); + addDeserializer(ToyTriangle.class, new ToyTriangle.ToyTriangleJacksonDeSerializer()); + } + + + @Override + public Map<Class, String> getTypeDefinitions() { + return new HashMap<Class, String>() {{ + put(ToyPoint.class, "ToyPoint"); + put(ToyTriangle.class, "ToyTriangle"); + }}; + } + + @Override + public String getTypeNamespace() { + return "toy"; + } } public static ToyIoRegistry getInstance() { http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/ToyPoint.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/ToyPoint.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/ToyPoint.java index b79d6c6..d0a5ca3 100644 --- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/ToyPoint.java +++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/ToyPoint.java @@ -19,10 +19,22 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.io.graphson.AbstractObjectDeserializer; +import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONTokens; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim; +import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex; +import org.apache.tinkerpop.shaded.jackson.core.JsonGenerationException; +import org.apache.tinkerpop.shaded.jackson.core.JsonGenerator; +import org.apache.tinkerpop.shaded.jackson.databind.SerializerProvider; +import org.apache.tinkerpop.shaded.jackson.databind.ser.std.StdScalarSerializer; +import org.apache.tinkerpop.shaded.jackson.databind.ser.std.StdSerializer; + +import java.io.IOException; +import java.util.Map; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -70,4 +82,32 @@ public final class ToyPoint { return new ToyPoint(input.readInt(), input.readInt()); } } + + public static class ToyPointJacksonSerializer extends StdScalarSerializer<ToyPoint> { + + public ToyPointJacksonSerializer() { + super(ToyPoint.class); + } + + @Override + public void serialize(final ToyPoint toyPoint, final JsonGenerator jsonGenerator, + final SerializerProvider serializerProvider) throws IOException, JsonGenerationException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeObjectField("x", toyPoint.x); + jsonGenerator.writeObjectField("y", toyPoint.y); + jsonGenerator.writeEndObject(); + } + } + + public static class ToyPointJacksonDeSerializer extends AbstractObjectDeserializer<ToyPoint> { + + public ToyPointJacksonDeSerializer() { + super(ToyPoint.class); + } + + @Override + public ToyPoint createObject(final Map<String, Object> map) { + return new ToyPoint((int) map.get("x"), (int) map.get("y")); + } + } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/ToyTriangle.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/ToyTriangle.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/ToyTriangle.java index 6e744e8..280cd01 100644 --- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/ToyTriangle.java +++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/ToyTriangle.java @@ -19,10 +19,19 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo; +import org.apache.tinkerpop.gremlin.structure.io.graphson.AbstractObjectDeserializer; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim; +import org.apache.tinkerpop.shaded.jackson.core.JsonGenerationException; +import org.apache.tinkerpop.shaded.jackson.core.JsonGenerator; +import org.apache.tinkerpop.shaded.jackson.databind.SerializerProvider; +import org.apache.tinkerpop.shaded.jackson.databind.ser.std.StdScalarSerializer; +import org.apache.tinkerpop.shaded.jackson.databind.ser.std.StdSerializer; + +import java.io.IOException; +import java.util.Map; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -77,4 +86,35 @@ public final class ToyTriangle { return new ToyTriangle(input.readInt(), input.readInt(), input.readInt()); } } + + + public static class ToyTriangleJacksonSerializer extends StdScalarSerializer<ToyTriangle> { + + public ToyTriangleJacksonSerializer() { + super(ToyTriangle.class); + } + + @Override + public void serialize(final ToyTriangle toyTriangle, final JsonGenerator jsonGenerator, + final SerializerProvider serializerProvider) throws IOException, JsonGenerationException { + jsonGenerator.writeStartObject(); + jsonGenerator.writeObjectField("x", toyTriangle.x); + jsonGenerator.writeObjectField("y", toyTriangle.y); + jsonGenerator.writeObjectField("z", toyTriangle.z); + jsonGenerator.writeEndObject(); + } + } + + public static class ToyTriangleJacksonDeSerializer extends AbstractObjectDeserializer<ToyTriangle> { + + public ToyTriangleJacksonDeSerializer() { + super(ToyTriangle.class); + } + + @Override + public ToyTriangle createObject(final Map<String, Object> map) { + return new ToyTriangle((int) map.get("x"), (int) map.get("y"), (int) map.get("z")); + } + } + } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java index 3bdf81f..60c0873 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java @@ -42,6 +42,7 @@ import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingP import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload; import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload; import org.apache.tinkerpop.gremlin.structure.io.AbstractIoRegistry; +import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool; import org.apache.tinkerpop.shaded.kryo.io.Output; @@ -86,7 +87,7 @@ public final class GryoSerializer extends Serializer implements Serializable { } // create a GryoPool and store it in static HadoopPools final List<Object> ioRegistries = new ArrayList<>(); - ioRegistries.addAll(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())); + ioRegistries.addAll(makeApacheConfiguration(sparkConfiguration).getList(IoRegistry.IO_REGISTRY, Collections.emptyList())); ioRegistries.add(SparkIoRegistry.class.getCanonicalName().replace("." + SparkIoRegistry.class.getSimpleName(), "$" + SparkIoRegistry.class.getSimpleName())); HadoopPools.initialize(GryoPool.build(). poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT)). http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java index 1385a5b..44a7464 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java @@ -25,22 +25,24 @@ package org.apache.tinkerpop.gremlin.spark.structure.io.gryo; import com.esotericsoftware.kryo.Kryo; +import org.apache.commons.configuration.BaseConfiguration; +import org.apache.commons.configuration.Configuration; import org.apache.spark.SparkConf; import org.apache.spark.serializer.KryoSerializer; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter; import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; -import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper; import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.shaded.ShadedSerializerAdapter; +import org.apache.tinkerpop.gremlin.structure.io.util.IoRegistryHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; /** - * A {@link KryoSerializer} that attempts to honor {@link GryoPool#CONFIG_IO_REGISTRY}. + * A {@link KryoSerializer} that attempts to honor {@link IoRegistry#IO_REGISTRY}. */ public final class IoRegistryAwareKryoSerializer extends KryoSerializer { @@ -50,11 +52,13 @@ public final class IoRegistryAwareKryoSerializer extends KryoSerializer { public IoRegistryAwareKryoSerializer(final SparkConf configuration) { super(configuration); - if (!configuration.contains(GryoPool.CONFIG_IO_REGISTRY)) - log.info("SparkConf does not contain a {} property. Skipping {} processing.", GryoPool.CONFIG_IO_REGISTRY, IoRegistry.class.getCanonicalName()); + if (!configuration.contains(IoRegistry.IO_REGISTRY)) + log.info("SparkConf does not contain a {} property. Skipping {} processing.", IoRegistry.IO_REGISTRY, IoRegistry.class.getCanonicalName()); else { - final GryoPool pool = GryoPool.build().poolSize(1).ioRegistries(Arrays.asList(configuration.get(GryoPool.CONFIG_IO_REGISTRY).split(","))).create(); - for (final TypeRegistration<?> type : pool.getMapper().getTypeRegistrations()) { + final Configuration apacheConfiguration = new BaseConfiguration(); + apacheConfiguration.setProperty(IoRegistry.IO_REGISTRY, configuration.get(IoRegistry.IO_REGISTRY)); + final GryoMapper mapper = GryoMapper.build().addRegistries(IoRegistryHelper.createRegistries(apacheConfiguration)).create(); + for (final TypeRegistration<?> type : mapper.getTypeRegistrations()) { log.info("Registering {} with serializer type: {}", type.getTargetClass().getCanonicalName(), type); this.typeRegistrations.add(type); } @@ -65,12 +69,12 @@ public final class IoRegistryAwareKryoSerializer extends KryoSerializer { public Kryo newKryo() { final Kryo kryo = super.newKryo(); for (final TypeRegistration<?> type : this.typeRegistrations) { - if (null != type.getSerializerShim()) - kryo.register(type.getTargetClass(), new UnshadedSerializerAdapter(type.getSerializerShim()), type.getId()); - else if (null != type.getShadedSerializer() && type.getShadedSerializer() instanceof ShadedSerializerAdapter) - kryo.register(type.getTargetClass(), new UnshadedSerializerAdapter(((ShadedSerializerAdapter) type.getShadedSerializer()).getSerializerShim()), type.getId()); - else - kryo.register(type.getTargetClass(), kryo.getDefaultSerializer(type.getTargetClass()), type.getId()); + if (null != type.getSerializerShim()) + kryo.register(type.getTargetClass(), new UnshadedSerializerAdapter(type.getSerializerShim()), type.getId()); + else if (null != type.getShadedSerializer() && type.getShadedSerializer() instanceof ShadedSerializerAdapter) + kryo.register(type.getTargetClass(), new UnshadedSerializerAdapter(((ShadedSerializerAdapter) type.getShadedSerializer()).getSerializerShim()), type.getId()); + else + kryo.register(type.getTargetClass(), kryo.getDefaultSerializer(type.getTargetClass()), type.getId()); } return kryo; } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java index caf5268..0998a9f 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java @@ -110,6 +110,7 @@ public class UnshadedKryoShimService implements KryoShimService { final IoRegistryAwareKryoSerializer ioRegistrySerializer = new IoRegistryAwareKryoSerializer(sparkConf); // Setup a pool backed by our spark.serializer instance // Reuse Gryo poolsize for Kryo poolsize (no need to copy this to SparkConf) + KRYOS.clear(); final int poolSize = configuration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT); for (int i = 0; i < poolSize; i++) { KRYOS.add(ioRegistrySerializer.newKryo()); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java index c2bc90f..ec9bf89 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java @@ -31,6 +31,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools; import org.apache.tinkerpop.gremlin.spark.structure.Spark; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import org.junit.After; import org.junit.Before; import org.slf4j.Logger; @@ -53,6 +54,7 @@ public abstract class AbstractSparkTest { Spark.create(sparkContext.sc()); Spark.close(); HadoopPools.close(); + KryoShimServiceLoader.close(); logger.info("SparkContext has been closed for " + this.getClass().getCanonicalName() + "-setupTest"); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/SparkGremlinIntegrateTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/SparkGremlinIntegrateTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/SparkGremlinIntegrateTest.java new file mode 100644 index 0000000..f8075ee --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/SparkGremlinIntegrateTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark; + +import org.apache.tinkerpop.gremlin.GraphProviderClass; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider; +import org.junit.runner.RunWith; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +@RunWith(SparkGremlinSuite.class) +@GraphProviderClass(provider = SparkHadoopGraphProvider.class, graph = HadoopGraph.class) +public class SparkGremlinIntegrateTest { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/SparkGremlinTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/SparkGremlinTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/SparkGremlinTest.java deleted file mode 100644 index cce9784..0000000 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/SparkGremlinTest.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.spark; - -import org.apache.tinkerpop.gremlin.GraphProviderClass; -import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; -import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider; -import org.junit.runner.RunWith; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -@RunWith(SparkGremlinSuite.class) -@GraphProviderClass(provider = SparkHadoopGraphProvider.class, graph = HadoopGraph.class) -public class SparkGremlinTest { -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java index dcec3f8..6cdcb67 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java @@ -27,6 +27,8 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider; import org.apache.tinkerpop.gremlin.hadoop.groovy.plugin.HadoopGremlinPluginCheck; import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorageCheck; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.ToyIoRegistry; import org.apache.tinkerpop.gremlin.process.computer.Computer; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; @@ -40,9 +42,12 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProgramTest; import org.apache.tinkerpop.gremlin.spark.structure.Spark; import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck; +import org.apache.tinkerpop.gremlin.spark.structure.io.SparkIoRegistryCheck; import org.apache.tinkerpop.gremlin.spark.structure.io.ToyGraphInputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator; import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import java.util.Map; @@ -59,6 +64,7 @@ public class SparkHadoopGraphProvider extends HadoopGraphProvider { public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { if (this.getClass().equals(SparkHadoopGraphProvider.class) && !SparkHadoopGraphProvider.class.getCanonicalName().equals(System.getProperty(PREVIOUS_SPARK_PROVIDER, null))) { Spark.close(); + HadoopPools.close(); KryoShimServiceLoader.close(); System.setProperty(PREVIOUS_SPARK_PROVIDER, SparkHadoopGraphProvider.class.getCanonicalName()); } @@ -89,6 +95,8 @@ public class SparkHadoopGraphProvider extends HadoopGraphProvider { // sugar plugin causes meta-method issues with a persisted context if (test.equals(HadoopGremlinPluginCheck.class)) { Spark.close(); + HadoopPools.close(); + KryoShimServiceLoader.close(); SugarTestHelper.clearRegistry(this); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8b57096f/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIoRegistryCheck.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIoRegistryCheck.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIoRegistryCheck.java index 54ed4ed..948bbbe 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIoRegistryCheck.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIoRegistryCheck.java @@ -24,9 +24,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.AbstractIoRegistryCheck; import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools; import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; import org.apache.tinkerpop.gremlin.spark.structure.Spark; -import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; -import org.apache.tinkerpop.gremlin.util.SystemUtil; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -58,4 +56,9 @@ public class SparkIoRegistryCheck extends AbstractIoRegistryCheck { public void shouldSupportGryoIoRegistry() throws Exception { super.checkGryoIoRegistryCompliance((HadoopGraph) graph, SparkGraphComputer.class); } + + @Test + public void shouldSupportGraphSONIoRegistry() throws Exception { + super.checkGraphSONIoRegistryCompliance((HadoopGraph) graph, SparkGraphComputer.class); + } }
