[ 
https://issues.apache.org/jira/browse/TINKERPOP-2681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17485896#comment-17485896
 ] 

ASF GitHub Bot commented on TINKERPOP-2681:
-------------------------------------------

divijvaidya commented on a change in pull request #1555:
URL: https://github.com/apache/tinkerpop/pull/1555#discussion_r797686317



##########
File path: 
gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MergeEdgeStep.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.map;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Merge;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.TraverserGenerator;
+import org.apache.tinkerpop.gremlin.process.traversal.lambda.ConstantTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Mutating;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.TraversalOptionParent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.Parameters;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.util.event.CallbackRegistry;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.event.Event;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.util.event.ListCallbackRegistry;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+
+/**
+ * Implementation for the {@code mergeE()} step covering both the start step 
version and the one used mid-traversal.
+ */
+public class MergeEdgeStep<S> extends FlatMapStep<S, Edge> implements 
Mutating<Event>,
+        TraversalOptionParent<Merge, S, Edge> {
+
+    private final boolean isStart;
+    private boolean first = true;
+    private Traversal.Admin<S,Map<Object, Object>> searchCreateTraversal;
+    private Traversal.Admin<S, Map<Object, Object>> onCreateTraversal = null;
+    private Traversal.Admin<S, Map<String, Object>> onMatchTraversal = null;
+
+    protected CallbackRegistry<Event> callbackRegistry;
+
+    public MergeEdgeStep(final Traversal.Admin traversal, final boolean 
isStart) {
+        this(traversal, isStart, new IdentityTraversal<>());
+    }
+
+    public MergeEdgeStep(final Traversal.Admin traversal, final boolean 
isStart, final Map<Object, Object> searchCreate) {
+        this(traversal, isStart, new ConstantTraversal<>(searchCreate));
+    }
+
+    public MergeEdgeStep(final Traversal.Admin traversal, final boolean 
isStart, final Traversal.Admin<?,Map<Object, Object>> searchCreateTraversal) {
+        super(traversal);
+        this.isStart = isStart;
+        this.searchCreateTraversal = integrateChild(searchCreateTraversal);
+    }
+
+    public Traversal.Admin<S, Map<Object, Object>> getSearchCreateTraversal() {
+        return searchCreateTraversal;
+    }
+
+    public Traversal.Admin<S, Map<Object, Object>> getOnCreateTraversal() {
+        return onCreateTraversal;
+    }
+
+    public Traversal.Admin<S, Map<String, Object>> getOnMatchTraversal() {
+        return onMatchTraversal;
+    }
+
+    /**
+     * Determines if this is a start step.
+     */
+    public boolean isStart() {
+        return isStart;
+    }
+
+    public boolean isFirst() {

Review comment:
       java doc for all public methods please

##########
File path: 
gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MergeEdgeStep.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.map;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Merge;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.TraverserGenerator;
+import org.apache.tinkerpop.gremlin.process.traversal.lambda.ConstantTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Mutating;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.TraversalOptionParent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.Parameters;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.util.event.CallbackRegistry;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.event.Event;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.util.event.ListCallbackRegistry;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+
+/**
+ * Implementation for the {@code mergeE()} step covering both the start step 
version and the one used mid-traversal.
+ */
+public class MergeEdgeStep<S> extends FlatMapStep<S, Edge> implements 
Mutating<Event>,
+        TraversalOptionParent<Merge, S, Edge> {
+
+    private final boolean isStart;
+    private boolean first = true;
+    private Traversal.Admin<S,Map<Object, Object>> searchCreateTraversal;
+    private Traversal.Admin<S, Map<Object, Object>> onCreateTraversal = null;
+    private Traversal.Admin<S, Map<String, Object>> onMatchTraversal = null;
+
+    protected CallbackRegistry<Event> callbackRegistry;
+
+    public MergeEdgeStep(final Traversal.Admin traversal, final boolean 
isStart) {
+        this(traversal, isStart, new IdentityTraversal<>());
+    }
+
+    public MergeEdgeStep(final Traversal.Admin traversal, final boolean 
isStart, final Map<Object, Object> searchCreate) {
+        this(traversal, isStart, new ConstantTraversal<>(searchCreate));
+    }
+
+    public MergeEdgeStep(final Traversal.Admin traversal, final boolean 
isStart, final Traversal.Admin<?,Map<Object, Object>> searchCreateTraversal) {
+        super(traversal);
+        this.isStart = isStart;
+        this.searchCreateTraversal = integrateChild(searchCreateTraversal);
+    }
+
+    public Traversal.Admin<S, Map<Object, Object>> getSearchCreateTraversal() {
+        return searchCreateTraversal;
+    }
+
+    public Traversal.Admin<S, Map<Object, Object>> getOnCreateTraversal() {
+        return onCreateTraversal;
+    }
+
+    public Traversal.Admin<S, Map<String, Object>> getOnMatchTraversal() {
+        return onMatchTraversal;
+    }
+
+    /**
+     * Determines if this is a start step.
+     */
+    public boolean isStart() {
+        return isStart;
+    }
+
+    public boolean isFirst() {
+        return first;
+    }
+
+    public CallbackRegistry<Event> getCallbackRegistry() {
+        return callbackRegistry;
+    }
+
+    @Override
+    public void addChildOption(final Merge token, final Traversal.Admin<S, 
Edge> traversalOption) {
+        if (token == Merge.onCreate) {
+            this.onCreateTraversal = this.integrateChild(traversalOption);
+        } else if (token == Merge.onMatch) {
+            this.onMatchTraversal = this.integrateChild(traversalOption);
+        } else {
+            throw new UnsupportedOperationException(String.format("Option %s 
for Merge is not supported", token.name()));
+        }
+    }
+
+    @Override
+    public <S, E> List<Traversal.Admin<S, E>> getLocalChildren() {
+        final List<Traversal.Admin<S, E>> children = new ArrayList<>();
+        if (searchCreateTraversal != null) children.add((Traversal.Admin<S, 
E>) searchCreateTraversal);
+        if (onMatchTraversal != null) children.add((Traversal.Admin<S, E>) 
onMatchTraversal);
+        if (onCreateTraversal != null) children.add((Traversal.Admin<S, E>) 
onCreateTraversal);
+        return children;
+    }
+
+    @Override
+    public void configure(final Object... keyValues) {
+        // this is a Mutating step but property() should not be folded into 
this step. this exception should not

Review comment:
       please add a test case to validate that (if not already added)

##########
File path: gremlin-test/features/map/MergeEdge.feature
##########
@@ -0,0 +1,201 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+@StepClassMap @StepMergeE
+Feature: Step - mergeE()
+
+  Scenario: g_V_mergeEXlabel_self_weight_05X
+    Given the empty graph
+    And the graph initializer of
+      """
+      g.addV("person").property("name", "marko").property("age", 29)
+      """
+    And using the parameter xx1 defined as "m[{\"t[label]\": \"self\", 
\"weight\":\"d[0.5].d\"}]"
+    And the traversal of
+      """
+      g.V().mergeE(xx1)
+      """
+    When iterated to list
+    Then the result should have a count of 1
+    And the graph should return 1 for count of 
"g.E().hasLabel(\"self\").has(\"weight\",0.5)"

Review comment:
       please also assert that it is connected to the correct vertex here 
(marko).

##########
File path: 
gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MergeVertexStep.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.map;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Merge;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.TraverserGenerator;
+import org.apache.tinkerpop.gremlin.process.traversal.lambda.ConstantTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Mutating;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.TraversalOptionParent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.Parameters;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.util.event.CallbackRegistry;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.event.Event;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.util.event.ListCallbackRegistry;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+/**
+ * Implementation for the {@code mergeV()} step covering both the start step 
version and the one used mid-traversal.
+ */
+public class MergeVertexStep<S> extends FlatMapStep<S, Vertex> implements 
Mutating<Event>,
+        TraversalOptionParent<Merge, S, Vertex> {
+
+    private final boolean isStart;
+    private boolean first = true;
+    private Traversal.Admin<S,Map<Object, Object>> searchCreateTraversal;
+    private Traversal.Admin<S, Map<Object, Object>> onCreateTraversal = null;
+    private Traversal.Admin<S, Map<String, Object>> onMatchTraversal = null;
+
+    protected CallbackRegistry<Event> callbackRegistry;
+
+    public MergeVertexStep(final Traversal.Admin traversal, final boolean 
isStart) {
+        this(traversal, isStart, new IdentityTraversal());
+    }
+
+    public MergeVertexStep(final Traversal.Admin traversal, final boolean 
isStart, final Map<Object, Object> searchCreate) {
+        this(traversal, isStart, new ConstantTraversal<>(searchCreate));
+    }
+
+    public MergeVertexStep(final Traversal.Admin traversal, final boolean 
isStart, final Traversal.Admin<S,Map<Object, Object>> searchCreateTraversal) {
+        super(traversal);
+        this.isStart = isStart;
+        this.searchCreateTraversal = integrateChild(searchCreateTraversal);
+    }
+
+    public Traversal.Admin<S, Map<Object, Object>> getSearchCreateTraversal() {
+        return searchCreateTraversal;
+    }
+
+    public Traversal.Admin<S, Map<Object, Object>> getOnCreateTraversal() {
+        return onCreateTraversal;
+    }
+
+    public Traversal.Admin<S, Map<String, Object>> getOnMatchTraversal() {
+        return onMatchTraversal;
+    }
+
+    /**
+     * Determines if this is a start step.
+     */
+    public boolean isStart() {
+        return isStart;
+    }
+
+    public boolean isFirst() {
+        return first;
+    }
+
+    public CallbackRegistry<Event> getCallbackRegistry() {
+        return callbackRegistry;
+    }
+
+    @Override
+    public void addChildOption(final Merge token, final Traversal.Admin<S, 
Vertex> traversalOption) {
+        if (token == Merge.onCreate) {
+            this.onCreateTraversal = this.integrateChild(traversalOption);
+        } else if (token == Merge.onMatch) {
+            this.onMatchTraversal = this.integrateChild(traversalOption);
+        } else {
+            throw new UnsupportedOperationException(String.format("Option %s 
for Merge is not supported", token.name()));
+        }
+    }
+
+    @Override
+    public <S, E> List<Traversal.Admin<S, E>> getLocalChildren() {
+        final List<Traversal.Admin<S, E>> children = new ArrayList<>();
+        if (searchCreateTraversal != null) children.add((Traversal.Admin<S, 
E>) searchCreateTraversal);
+        if (onMatchTraversal != null) children.add((Traversal.Admin<S, E>) 
onMatchTraversal);
+        if (onCreateTraversal != null) children.add((Traversal.Admin<S, E>) 
onCreateTraversal);
+        return children;
+    }
+
+    @Override
+    public void configure(final Object... keyValues) {
+        // this is a Mutating step but property() should not be folded into 
this step. this exception should not
+        // end up visible to users really
+    }
+
+    @Override
+    public Parameters getParameters() {
+        // merge doesn't take fold ups of property() calls. those need to get 
treated as regular old PropertyStep
+        // instances. not sure if this should support with() though.....none 
of the other Mutating steps do.
+        return null;
+    }
+
+    @Override
+    protected Traverser.Admin<Vertex> processNextStart() {
+        // when it's a start step a traverser needs to be created to kick off 
the traversal.
+        if (isStart && first) {
+            first = false;
+            final TraverserGenerator generator = 
this.getTraversal().getTraverserGenerator();
+            this.addStart(generator.generate(false, (Step) this, 1L));
+        }
+        return super.processNextStart();
+    }
+
+    /**
+     * Use the {@code Map} of search criteria to most efficiently return a 
{@code Stream<Vertex>} of matching elements.
+     * Providers might override this method when extending this step to 
provide their own optimized mechanisms for
+     * matching the list of vertices. This implementation is only optimized 
for the {@link T#id} so any other usage
+     * will simply be in-memory filtering which could be slow.
+     */
+    protected Stream<Vertex> createSearchStream(final Map<Object,Object> 
search) {
+        final Graph graph = this.getTraversal().getGraph().get();
+
+        Stream<Vertex> stream;
+        // prioritize lookup by id
+        if (search.containsKey(T.id))
+            stream = IteratorUtils.stream(graph.vertices(search.get(T.id)));
+        else
+            stream = IteratorUtils.stream(graph.vertices());
+
+        // in-memory filter is not going to be nice here. it will be up to 
graphs to optimize this step as they do
+        // for other Mutation steps
+        stream = stream.filter(v -> {
+            // try to match on all search criteria skipping T.id as it was 
handled above
+            return search.entrySet().stream().filter(kv -> kv.getKey() != 
T.id).allMatch(kv -> {
+                if (kv.getKey() == T.label) {
+                    return v.label().equals(kv.getValue());
+                } else {
+                    final VertexProperty<Object> vp = 
v.property(kv.getKey().toString());
+                    return vp.isPresent() && kv.getValue().equals(vp.value());
+                }
+            });
+        });
+
+        return stream;
+    }
+
+    @Override
+    protected Iterator<Vertex> flatMap(final Traverser.Admin<S> traverser) {
+        final Map<Object,Object> searchCreate = TraversalUtil.apply(traverser, 
searchCreateTraversal);
+
+        Stream<Vertex> stream = createSearchStream(searchCreate);
+        stream = stream.map(v -> {
+            // if no onMatch is defined then there is no update - return the 
vertex unchanged
+            if (null == onMatchTraversal) return v;
+
+            // assume good input from GraphTraversal - folks might drop in a T 
here even though it is immutable
+            final Map<String, Object> onMatchMap = 
TraversalUtil.apply(traverser, onMatchTraversal);
+            onMatchMap.forEach((key, value) -> {
+                // trigger callbacks for eventing - in this case, it's a 
VertexPropertyChangedEvent. if there's no
+                // registry/callbacks then just set the property
+                if (this.callbackRegistry != null && 
!callbackRegistry.getCallbacks().isEmpty()) {
+                    final EventStrategy eventStrategy = 
getTraversal().getStrategies().getStrategy(EventStrategy.class).get();
+                    final Property<?> p = v.property(key);
+                    final Property<Object> oldValue = p.isPresent() ? 
eventStrategy.detach(v.property(key)) : null;
+                    final Event.VertexPropertyChangedEvent vpce = new 
Event.VertexPropertyChangedEvent(eventStrategy.detach(v), oldValue, value);
+                    this.callbackRegistry.getCallbacks().forEach(c -> 
c.accept(vpce));
+                }
+                v.property(key, value);

Review comment:
       This line assumes that the cardinality will be the default one which is 
incorrect.

##########
File path: 
gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MergeEdgeStep.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.map;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Merge;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.TraverserGenerator;
+import org.apache.tinkerpop.gremlin.process.traversal.lambda.ConstantTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Mutating;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.TraversalOptionParent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.Parameters;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.util.event.CallbackRegistry;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.event.Event;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.util.event.ListCallbackRegistry;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+
+/**
+ * Implementation for the {@code mergeE()} step covering both the start step 
version and the one used mid-traversal.
+ */
+public class MergeEdgeStep<S> extends FlatMapStep<S, Edge> implements 
Mutating<Event>,
+        TraversalOptionParent<Merge, S, Edge> {
+
+    private final boolean isStart;
+    private boolean first = true;
+    private Traversal.Admin<S,Map<Object, Object>> searchCreateTraversal;
+    private Traversal.Admin<S, Map<Object, Object>> onCreateTraversal = null;
+    private Traversal.Admin<S, Map<String, Object>> onMatchTraversal = null;
+
+    protected CallbackRegistry<Event> callbackRegistry;
+
+    public MergeEdgeStep(final Traversal.Admin traversal, final boolean 
isStart) {
+        this(traversal, isStart, new IdentityTraversal<>());
+    }
+
+    public MergeEdgeStep(final Traversal.Admin traversal, final boolean 
isStart, final Map<Object, Object> searchCreate) {
+        this(traversal, isStart, new ConstantTraversal<>(searchCreate));
+    }
+
+    public MergeEdgeStep(final Traversal.Admin traversal, final boolean 
isStart, final Traversal.Admin<?,Map<Object, Object>> searchCreateTraversal) {
+        super(traversal);
+        this.isStart = isStart;
+        this.searchCreateTraversal = integrateChild(searchCreateTraversal);
+    }
+
+    public Traversal.Admin<S, Map<Object, Object>> getSearchCreateTraversal() {
+        return searchCreateTraversal;
+    }
+
+    public Traversal.Admin<S, Map<Object, Object>> getOnCreateTraversal() {
+        return onCreateTraversal;
+    }
+
+    public Traversal.Admin<S, Map<String, Object>> getOnMatchTraversal() {
+        return onMatchTraversal;
+    }
+
+    /**
+     * Determines if this is a start step.
+     */
+    public boolean isStart() {
+        return isStart;
+    }
+
+    public boolean isFirst() {
+        return first;
+    }
+
+    public CallbackRegistry<Event> getCallbackRegistry() {
+        return callbackRegistry;
+    }
+
+    @Override
+    public void addChildOption(final Merge token, final Traversal.Admin<S, 
Edge> traversalOption) {
+        if (token == Merge.onCreate) {
+            this.onCreateTraversal = this.integrateChild(traversalOption);
+        } else if (token == Merge.onMatch) {
+            this.onMatchTraversal = this.integrateChild(traversalOption);
+        } else {
+            throw new UnsupportedOperationException(String.format("Option %s 
for Merge is not supported", token.name()));
+        }
+    }
+
+    @Override
+    public <S, E> List<Traversal.Admin<S, E>> getLocalChildren() {
+        final List<Traversal.Admin<S, E>> children = new ArrayList<>();
+        if (searchCreateTraversal != null) children.add((Traversal.Admin<S, 
E>) searchCreateTraversal);
+        if (onMatchTraversal != null) children.add((Traversal.Admin<S, E>) 
onMatchTraversal);
+        if (onCreateTraversal != null) children.add((Traversal.Admin<S, E>) 
onCreateTraversal);
+        return children;
+    }
+
+    @Override
+    public void configure(final Object... keyValues) {
+        // this is a Mutating step but property() should not be folded into 
this step. this exception should not
+        // end up visible to users really
+    }
+
+    @Override
+    public Parameters getParameters() {
+        // merge doesn't take fold ups of property() calls. those need to get 
treated as regular old PropertyStep
+        // instances. not sure if this should support with() though.....none 
of the other Mutating steps do.
+        return null;
+    }
+
+    @Override
+    protected Traverser.Admin<Edge> processNextStart() {
+        // when it's a start step a traverser needs to be created to kick off 
the traversal.
+        if (isStart && first) {
+            first = false;
+            final TraverserGenerator generator = 
this.getTraversal().getTraverserGenerator();
+            this.addStart(generator.generate(false, (Step) this, 1L));
+        }
+        return super.processNextStart();
+    }
+
+    /**
+     * Use the {@code Map} of search criteria to most efficiently return a 
{@code Stream<Edge>} of matching elements.
+     * Providers might override this method when extending this step to 
provide their own optimized mechanisms for
+     * matching the list of edges. This implementation is only optimized for 
the {@link T#id} so any other usage
+     * will simply be in-memory filtering which could be slow.
+     */
+    protected Stream<Edge> createSearchStream(final Map<Object,Object> search) 
{
+        final Graph graph = this.getTraversal().getGraph().get();
+
+        final Optional<Direction> directionUsedInLookup;
+        Stream<Edge> stream;
+        // prioritize lookup by id, then use vertices as starting point if 
possible
+        if (search.containsKey(T.id)) {
+            stream = IteratorUtils.stream(graph.edges(search.get(T.id)));
+            directionUsedInLookup = Optional.empty();
+        } else if (search.containsKey(Direction.OUT)) {
+            stream = 
IteratorUtils.stream(graph.vertices(search.get(Direction.OUT))).flatMap(v -> 
IteratorUtils.stream(v.edges(Direction.OUT)));
+            directionUsedInLookup = Optional.of(Direction.OUT);
+        } else if (search.containsKey(Direction.IN)) {
+            stream = 
IteratorUtils.stream(graph.vertices(search.get(Direction.IN))).flatMap(v -> 
IteratorUtils.stream(v.edges(Direction.IN)));
+            directionUsedInLookup = Optional.of(Direction.IN);
+        } else {
+            stream = IteratorUtils.stream(graph.edges());
+            directionUsedInLookup = Optional.empty();
+        }
+
+        // in-memory filter is not going to be nice here. it will be up to 
graphs to optimize this step as they do
+        // for other Mutation steps
+        stream = stream.filter(e -> {
+            // try to match on all search criteria skipping T.id as it was 
handled above
+            return search.entrySet().stream().filter(

Review comment:
       nit
   
   This lambda could be extracted into a `Predicate`

##########
File path: gremlin-test/features/map/MergeEdge.feature
##########
@@ -0,0 +1,201 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+@StepClassMap @StepMergeE
+Feature: Step - mergeE()

Review comment:
       Please add the following test scenarios (similar cases for mergeV as 
well):
   1. Uses g.mergeE() but does not specify OUT in the map
   2. Uses g.mergeE() but does not specify IN in the map 
   3. Uses g.mergeE() but does not specify either OUT or IN in the map 
   4. Uses g.mergeE() but does not specify LABEL in the map 
   5. Uses g.V(1).mergeE() but specifies OUT in the map which is different than 
1
   6. Wrong syntax for the map (this case could go in grammar test)
   7. lower case OUT/IN in the map (this case could go in grammar test) 
   8. test for mergeE(traversal) --> (I am quite curious to see the use case 
for this scenario)
   9. mergeE() at the beginning of a child traversal
   10. mergeE() in the middle a child traversal 
   11. mergeV() with insertion of property having different cardinality 
(list/set/single)
   12. mergeV() when existing vertex has single cardinality but new vertex 
wants to add same property with set cardinality
   

##########
File path: 
gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MergeEdgeStep.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.map;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Merge;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.TraverserGenerator;
+import org.apache.tinkerpop.gremlin.process.traversal.lambda.ConstantTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Mutating;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.TraversalOptionParent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.Parameters;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.util.event.CallbackRegistry;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.event.Event;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.util.event.ListCallbackRegistry;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+
+/**
+ * Implementation for the {@code mergeE()} step covering both the start step 
version and the one used mid-traversal.
+ */
+public class MergeEdgeStep<S> extends FlatMapStep<S, Edge> implements 
Mutating<Event>,
+        TraversalOptionParent<Merge, S, Edge> {
+
+    private final boolean isStart;
+    private boolean first = true;
+    private Traversal.Admin<S,Map<Object, Object>> searchCreateTraversal;
+    private Traversal.Admin<S, Map<Object, Object>> onCreateTraversal = null;
+    private Traversal.Admin<S, Map<String, Object>> onMatchTraversal = null;
+
+    protected CallbackRegistry<Event> callbackRegistry;
+
+    public MergeEdgeStep(final Traversal.Admin traversal, final boolean 
isStart) {
+        this(traversal, isStart, new IdentityTraversal<>());
+    }
+
+    public MergeEdgeStep(final Traversal.Admin traversal, final boolean 
isStart, final Map<Object, Object> searchCreate) {
+        this(traversal, isStart, new ConstantTraversal<>(searchCreate));
+    }
+
+    public MergeEdgeStep(final Traversal.Admin traversal, final boolean 
isStart, final Traversal.Admin<?,Map<Object, Object>> searchCreateTraversal) {
+        super(traversal);
+        this.isStart = isStart;
+        this.searchCreateTraversal = integrateChild(searchCreateTraversal);
+    }
+
+    public Traversal.Admin<S, Map<Object, Object>> getSearchCreateTraversal() {
+        return searchCreateTraversal;
+    }
+
+    public Traversal.Admin<S, Map<Object, Object>> getOnCreateTraversal() {
+        return onCreateTraversal;
+    }
+
+    public Traversal.Admin<S, Map<String, Object>> getOnMatchTraversal() {
+        return onMatchTraversal;
+    }
+
+    /**
+     * Determines if this is a start step.
+     */
+    public boolean isStart() {
+        return isStart;
+    }
+
+    public boolean isFirst() {
+        return first;
+    }
+
+    public CallbackRegistry<Event> getCallbackRegistry() {
+        return callbackRegistry;
+    }
+
+    @Override
+    public void addChildOption(final Merge token, final Traversal.Admin<S, 
Edge> traversalOption) {
+        if (token == Merge.onCreate) {
+            this.onCreateTraversal = this.integrateChild(traversalOption);
+        } else if (token == Merge.onMatch) {
+            this.onMatchTraversal = this.integrateChild(traversalOption);
+        } else {
+            throw new UnsupportedOperationException(String.format("Option %s 
for Merge is not supported", token.name()));
+        }
+    }
+
+    @Override
+    public <S, E> List<Traversal.Admin<S, E>> getLocalChildren() {
+        final List<Traversal.Admin<S, E>> children = new ArrayList<>();
+        if (searchCreateTraversal != null) children.add((Traversal.Admin<S, 
E>) searchCreateTraversal);
+        if (onMatchTraversal != null) children.add((Traversal.Admin<S, E>) 
onMatchTraversal);
+        if (onCreateTraversal != null) children.add((Traversal.Admin<S, E>) 
onCreateTraversal);
+        return children;
+    }
+
+    @Override
+    public void configure(final Object... keyValues) {
+        // this is a Mutating step but property() should not be folded into 
this step. this exception should not
+        // end up visible to users really
+    }
+
+    @Override
+    public Parameters getParameters() {
+        // merge doesn't take fold ups of property() calls. those need to get 
treated as regular old PropertyStep
+        // instances. not sure if this should support with() though.....none 
of the other Mutating steps do.
+        return null;
+    }
+
+    @Override
+    protected Traverser.Admin<Edge> processNextStart() {
+        // when it's a start step a traverser needs to be created to kick off 
the traversal.
+        if (isStart && first) {
+            first = false;
+            final TraverserGenerator generator = 
this.getTraversal().getTraverserGenerator();
+            this.addStart(generator.generate(false, (Step) this, 1L));
+        }
+        return super.processNextStart();
+    }
+
+    /**
+     * Use the {@code Map} of search criteria to most efficiently return a 
{@code Stream<Edge>} of matching elements.
+     * Providers might override this method when extending this step to 
provide their own optimized mechanisms for
+     * matching the list of edges. This implementation is only optimized for 
the {@link T#id} so any other usage
+     * will simply be in-memory filtering which could be slow.
+     */
+    protected Stream<Edge> createSearchStream(final Map<Object,Object> search) 
{
+        final Graph graph = this.getTraversal().getGraph().get();
+
+        final Optional<Direction> directionUsedInLookup;
+        Stream<Edge> stream;
+        // prioritize lookup by id, then use vertices as starting point if 
possible
+        if (search.containsKey(T.id)) {
+            stream = IteratorUtils.stream(graph.edges(search.get(T.id)));
+            directionUsedInLookup = Optional.empty();
+        } else if (search.containsKey(Direction.OUT)) {
+            stream = 
IteratorUtils.stream(graph.vertices(search.get(Direction.OUT))).flatMap(v -> 
IteratorUtils.stream(v.edges(Direction.OUT)));
+            directionUsedInLookup = Optional.of(Direction.OUT);
+        } else if (search.containsKey(Direction.IN)) {
+            stream = 
IteratorUtils.stream(graph.vertices(search.get(Direction.IN))).flatMap(v -> 
IteratorUtils.stream(v.edges(Direction.IN)));
+            directionUsedInLookup = Optional.of(Direction.IN);
+        } else {
+            stream = IteratorUtils.stream(graph.edges());
+            directionUsedInLookup = Optional.empty();
+        }
+
+        // in-memory filter is not going to be nice here. it will be up to 
graphs to optimize this step as they do
+        // for other Mutation steps
+        stream = stream.filter(e -> {
+            // try to match on all search criteria skipping T.id as it was 
handled above
+            return search.entrySet().stream().filter(
+                    kv -> kv.getKey() != T.id && 
!(directionUsedInLookup.isPresent() && kv.getKey() == 
directionUsedInLookup.get())).
+                    allMatch(kv -> {
+                        if (kv.getKey() == T.label) {
+                            return e.label().equals(kv.getValue());
+                        } else if (kv.getKey() instanceof Direction) {
+                            final Direction direction = (Direction) 
kv.getKey();
+
+                            // try to take advantage of string id conversions 
of the graph by doing a lookup rather
+                            // than direct compare on id
+                            final Iterator<Vertex> found = 
graph.vertices(kv.getValue());
+                            return found.hasNext() && 
e.vertices(direction).next().equals(found.next());

Review comment:
       Iterator leak here. Please close the iterator once it has been used.

##########
File path: 
gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MergeEdgeStep.java
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.map;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Merge;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.TraverserGenerator;
+import org.apache.tinkerpop.gremlin.process.traversal.lambda.ConstantTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Mutating;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.TraversalOptionParent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.Parameters;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.util.event.CallbackRegistry;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.event.Event;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.util.event.ListCallbackRegistry;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Stream;
+
+/**
+ * Implementation for the {@code mergeE()} step covering both the start step 
version and the one used mid-traversal.
+ */
+public class MergeEdgeStep<S> extends FlatMapStep<S, Edge> implements 
Mutating<Event>,
+        TraversalOptionParent<Merge, S, Edge> {
+
+    private final boolean isStart;
+    private boolean first = true;
+    private Traversal.Admin<S,Map<Object, Object>> searchCreateTraversal;
+    private Traversal.Admin<S, Map<Object, Object>> onCreateTraversal = null;
+    private Traversal.Admin<S, Map<String, Object>> onMatchTraversal = null;
+
+    protected CallbackRegistry<Event> callbackRegistry;
+
+    public MergeEdgeStep(final Traversal.Admin traversal, final boolean 
isStart) {
+        this(traversal, isStart, new IdentityTraversal<>());
+    }
+
+    public MergeEdgeStep(final Traversal.Admin traversal, final boolean 
isStart, final Map<Object, Object> searchCreate) {
+        this(traversal, isStart, new ConstantTraversal<>(searchCreate));
+    }
+
+    public MergeEdgeStep(final Traversal.Admin traversal, final boolean 
isStart, final Traversal.Admin<?,Map<Object, Object>> searchCreateTraversal) {
+        super(traversal);
+        this.isStart = isStart;
+        this.searchCreateTraversal = integrateChild(searchCreateTraversal);
+    }
+
+    public Traversal.Admin<S, Map<Object, Object>> getSearchCreateTraversal() {
+        return searchCreateTraversal;
+    }
+
+    public Traversal.Admin<S, Map<Object, Object>> getOnCreateTraversal() {
+        return onCreateTraversal;
+    }
+
+    public Traversal.Admin<S, Map<String, Object>> getOnMatchTraversal() {
+        return onMatchTraversal;
+    }
+
+    /**
+     * Determines if this is a start step.
+     */
+    public boolean isStart() {
+        return isStart;
+    }
+
+    public boolean isFirst() {
+        return first;
+    }
+
+    public CallbackRegistry<Event> getCallbackRegistry() {
+        return callbackRegistry;
+    }
+
+    @Override
+    public void addChildOption(final Merge token, final Traversal.Admin<S, 
Edge> traversalOption) {
+        if (token == Merge.onCreate) {
+            this.onCreateTraversal = this.integrateChild(traversalOption);
+        } else if (token == Merge.onMatch) {
+            this.onMatchTraversal = this.integrateChild(traversalOption);
+        } else {
+            throw new UnsupportedOperationException(String.format("Option %s 
for Merge is not supported", token.name()));
+        }
+    }
+
+    @Override
+    public <S, E> List<Traversal.Admin<S, E>> getLocalChildren() {
+        final List<Traversal.Admin<S, E>> children = new ArrayList<>();
+        if (searchCreateTraversal != null) children.add((Traversal.Admin<S, 
E>) searchCreateTraversal);
+        if (onMatchTraversal != null) children.add((Traversal.Admin<S, E>) 
onMatchTraversal);
+        if (onCreateTraversal != null) children.add((Traversal.Admin<S, E>) 
onCreateTraversal);
+        return children;
+    }
+
+    @Override
+    public void configure(final Object... keyValues) {
+        // this is a Mutating step but property() should not be folded into 
this step. this exception should not
+        // end up visible to users really
+    }
+
+    @Override
+    public Parameters getParameters() {
+        // merge doesn't take fold ups of property() calls. those need to get 
treated as regular old PropertyStep
+        // instances. not sure if this should support with() though.....none 
of the other Mutating steps do.
+        return null;
+    }
+
+    @Override
+    protected Traverser.Admin<Edge> processNextStart() {
+        // when it's a start step a traverser needs to be created to kick off 
the traversal.
+        if (isStart && first) {
+            first = false;
+            final TraverserGenerator generator = 
this.getTraversal().getTraverserGenerator();
+            this.addStart(generator.generate(false, (Step) this, 1L));
+        }
+        return super.processNextStart();
+    }
+
+    /**
+     * Use the {@code Map} of search criteria to most efficiently return a 
{@code Stream<Edge>} of matching elements.
+     * Providers might override this method when extending this step to 
provide their own optimized mechanisms for
+     * matching the list of edges. This implementation is only optimized for 
the {@link T#id} so any other usage
+     * will simply be in-memory filtering which could be slow.
+     */
+    protected Stream<Edge> createSearchStream(final Map<Object,Object> search) 
{

Review comment:
       +1

##########
File path: 
gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MergeVertexStep.java
##########
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.map;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Merge;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.TraverserGenerator;
+import org.apache.tinkerpop.gremlin.process.traversal.lambda.ConstantTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Mutating;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.TraversalOptionParent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.Parameters;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.util.event.CallbackRegistry;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.event.Event;
+import 
org.apache.tinkerpop.gremlin.process.traversal.step.util.event.ListCallbackRegistry;
+import 
org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy;
+import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+/**
+ * Implementation for the {@code mergeV()} step covering both the start step 
version and the one used mid-traversal.
+ */
+public class MergeVertexStep<S> extends FlatMapStep<S, Vertex> implements 
Mutating<Event>,
+        TraversalOptionParent<Merge, S, Vertex> {
+
+    private final boolean isStart;
+    private boolean first = true;
+    private Traversal.Admin<S,Map<Object, Object>> searchCreateTraversal;
+    private Traversal.Admin<S, Map<Object, Object>> onCreateTraversal = null;
+    private Traversal.Admin<S, Map<String, Object>> onMatchTraversal = null;
+
+    protected CallbackRegistry<Event> callbackRegistry;
+
+    public MergeVertexStep(final Traversal.Admin traversal, final boolean 
isStart) {
+        this(traversal, isStart, new IdentityTraversal());
+    }
+
+    public MergeVertexStep(final Traversal.Admin traversal, final boolean 
isStart, final Map<Object, Object> searchCreate) {
+        this(traversal, isStart, new ConstantTraversal<>(searchCreate));
+    }
+
+    public MergeVertexStep(final Traversal.Admin traversal, final boolean 
isStart, final Traversal.Admin<S,Map<Object, Object>> searchCreateTraversal) {
+        super(traversal);
+        this.isStart = isStart;
+        this.searchCreateTraversal = integrateChild(searchCreateTraversal);
+    }
+
+    public Traversal.Admin<S, Map<Object, Object>> getSearchCreateTraversal() {
+        return searchCreateTraversal;
+    }
+
+    public Traversal.Admin<S, Map<Object, Object>> getOnCreateTraversal() {
+        return onCreateTraversal;
+    }
+
+    public Traversal.Admin<S, Map<String, Object>> getOnMatchTraversal() {
+        return onMatchTraversal;
+    }
+
+    /**
+     * Determines if this is a start step.
+     */
+    public boolean isStart() {
+        return isStart;
+    }
+
+    public boolean isFirst() {
+        return first;
+    }
+
+    public CallbackRegistry<Event> getCallbackRegistry() {
+        return callbackRegistry;
+    }
+
+    @Override
+    public void addChildOption(final Merge token, final Traversal.Admin<S, 
Vertex> traversalOption) {
+        if (token == Merge.onCreate) {
+            this.onCreateTraversal = this.integrateChild(traversalOption);
+        } else if (token == Merge.onMatch) {
+            this.onMatchTraversal = this.integrateChild(traversalOption);
+        } else {
+            throw new UnsupportedOperationException(String.format("Option %s 
for Merge is not supported", token.name()));
+        }
+    }
+
+    @Override
+    public <S, E> List<Traversal.Admin<S, E>> getLocalChildren() {
+        final List<Traversal.Admin<S, E>> children = new ArrayList<>();
+        if (searchCreateTraversal != null) children.add((Traversal.Admin<S, 
E>) searchCreateTraversal);
+        if (onMatchTraversal != null) children.add((Traversal.Admin<S, E>) 
onMatchTraversal);
+        if (onCreateTraversal != null) children.add((Traversal.Admin<S, E>) 
onCreateTraversal);
+        return children;
+    }
+
+    @Override
+    public void configure(final Object... keyValues) {
+        // this is a Mutating step but property() should not be folded into 
this step. this exception should not
+        // end up visible to users really
+    }
+
+    @Override
+    public Parameters getParameters() {
+        // merge doesn't take fold ups of property() calls. those need to get 
treated as regular old PropertyStep
+        // instances. not sure if this should support with() though.....none 
of the other Mutating steps do.
+        return null;
+    }
+
+    @Override
+    protected Traverser.Admin<Vertex> processNextStart() {
+        // when it's a start step a traverser needs to be created to kick off 
the traversal.
+        if (isStart && first) {
+            first = false;
+            final TraverserGenerator generator = 
this.getTraversal().getTraverserGenerator();
+            this.addStart(generator.generate(false, (Step) this, 1L));
+        }
+        return super.processNextStart();
+    }
+
+    /**
+     * Use the {@code Map} of search criteria to most efficiently return a 
{@code Stream<Vertex>} of matching elements.
+     * Providers might override this method when extending this step to 
provide their own optimized mechanisms for
+     * matching the list of vertices. This implementation is only optimized 
for the {@link T#id} so any other usage
+     * will simply be in-memory filtering which could be slow.
+     */
+    protected Stream<Vertex> createSearchStream(final Map<Object,Object> 
search) {
+        final Graph graph = this.getTraversal().getGraph().get();
+
+        Stream<Vertex> stream;
+        // prioritize lookup by id
+        if (search.containsKey(T.id))
+            stream = IteratorUtils.stream(graph.vertices(search.get(T.id)));
+        else
+            stream = IteratorUtils.stream(graph.vertices());
+
+        // in-memory filter is not going to be nice here. it will be up to 
graphs to optimize this step as they do
+        // for other Mutation steps
+        stream = stream.filter(v -> {
+            // try to match on all search criteria skipping T.id as it was 
handled above
+            return search.entrySet().stream().filter(kv -> kv.getKey() != 
T.id).allMatch(kv -> {
+                if (kv.getKey() == T.label) {
+                    return v.label().equals(kv.getValue());
+                } else {
+                    final VertexProperty<Object> vp = 
v.property(kv.getKey().toString());
+                    return vp.isPresent() && kv.getValue().equals(vp.value());
+                }
+            });
+        });
+
+        return stream;
+    }
+
+    @Override
+    protected Iterator<Vertex> flatMap(final Traverser.Admin<S> traverser) {
+        final Map<Object,Object> searchCreate = TraversalUtil.apply(traverser, 
searchCreateTraversal);
+
+        Stream<Vertex> stream = createSearchStream(searchCreate);
+        stream = stream.map(v -> {
+            // if no onMatch is defined then there is no update - return the 
vertex unchanged
+            if (null == onMatchTraversal) return v;
+
+            // assume good input from GraphTraversal - folks might drop in a T 
here even though it is immutable
+            final Map<String, Object> onMatchMap = 
TraversalUtil.apply(traverser, onMatchTraversal);
+            onMatchMap.forEach((key, value) -> {
+                // trigger callbacks for eventing - in this case, it's a 
VertexPropertyChangedEvent. if there's no
+                // registry/callbacks then just set the property
+                if (this.callbackRegistry != null && 
!callbackRegistry.getCallbacks().isEmpty()) {

Review comment:
       I missed any tests in this PR which test the event stream for these cases

##########
File path: gremlin-test/features/map/MergeEdge.feature
##########
@@ -0,0 +1,201 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+@StepClassMap @StepMergeE
+Feature: Step - mergeE()
+
+  Scenario: g_V_mergeEXlabel_self_weight_05X
+    Given the empty graph
+    And the graph initializer of
+      """
+      g.addV("person").property("name", "marko").property("age", 29)
+      """
+    And using the parameter xx1 defined as "m[{\"t[label]\": \"self\", 
\"weight\":\"d[0.5].d\"}]"
+    And the traversal of
+      """
+      g.V().mergeE(xx1)
+      """
+    When iterated to list
+    Then the result should have a count of 1
+    And the graph should return 1 for count of 
"g.E().hasLabel(\"self\").has(\"weight\",0.5)"
+
+  @UserSuppliedVertexIds
+  Scenario: g_mergeEXlabel_knows_out_marko_in_vadasX
+    Given the empty graph
+    And the graph initializer of
+      """
+      g.addV("person").property(T.id, 100).property("name", "marko").
+        addV("person").property(T.id, 101).property("name", "vadas")
+      """
+    And using the parameter xx1 defined as "m[{\"t[label]\": \"knows\", 
\"D[OUT]\":\"v[100]\", \"D[IN]\":\"v[101]\"}]"
+    And the traversal of
+      """
+      g.mergeE(xx1)
+      """
+    When iterated to list
+    Then the result should have a count of 1
+    And the graph should return 1 for count of 
"g.V().has(\"person\",\"name\",\"marko\").out(\"knows\").has(\"person\",\"name\",\"vadas\")"
+
+  @UserSuppliedVertexIds
+  Scenario: g_mergeEXlabel_knows_out_marko_in_vadas_weight_05X_exists
+    Given the empty graph
+    And the graph initializer of
+      """
+      g.addV("person").property(T.id, 100).property("name", "marko").as("a").
+        addV("person").property(T.id, 101).property("name", "vadas").as("b").
+        addE("knows").from("a").to("b")
+      """
+    And using the parameter xx1 defined as "m[{\"t[label]\": \"knows\", 
\"D[OUT]\":\"v[100]\", \"D[IN]\":\"v[101]\", \"weight\":\"d[0.5].d\"}]"
+    And the traversal of
+      """
+      g.mergeE(xx1)
+      """
+    When iterated to list
+    Then the result should have a count of 1
+    And the graph should return 1 for count of 
"g.V().has(\"person\",\"name\",\"marko\").outE(\"knows\").has(\"weight\",0.5).inV().has(\"person\",\"name\",\"vadas\")"
+    And the graph should return 2 for count of 
"g.V().has(\"person\",\"name\",\"marko\").outE(\"knows\").inV().has(\"person\",\"name\",\"vadas\")"

Review comment:
       nit
   
   outE().inV() could be replaced with out()




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


> Create merge() step to codify best practice for upsert pattern
> --------------------------------------------------------------
>
>                 Key: TINKERPOP-2681
>                 URL: https://issues.apache.org/jira/browse/TINKERPOP-2681
>             Project: TinkerPop
>          Issue Type: Improvement
>          Components: language, process
>    Affects Versions: 3.5.1
>            Reporter: Dave Bechberger
>            Assignee: Stephen Mallette
>            Priority: Major
>
> Create a step that codifies the best practice for the upsert functionality 
> into a single step to make it easier to use, more discoverable, and easier 
> for implementers to optimize.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to