This is an automated email from the ASF dual-hosted git repository.

kenhuuu pushed a commit to branch poc-bulkset-result
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git

commit 1482dea8901794b76296179316d184ab6eb00263
Author: Ken Hu <[email protected]>
AuthorDate: Fri Jul 26 16:19:32 2024 -0700

    initial attempt at returning results with a BulkSet
---
 .../driver/handler/GremlinResponseHandler.java        |  3 ++-
 .../org/apache/tinkerpop/gremlin/server/Context.java  | 10 +++++++---
 .../server/handler/HttpGremlinEndpointHandler.java    | 14 ++++++++------
 .../server/util/TextPlainMessageSerializerV4.java     |  6 +++---
 .../gremlin/util/message/ResponseMessageV4.java       |  8 +++++++-
 .../gremlin/util/message/ResponseResultV4.java        | 15 ++++++++++++---
 .../util/ser/AbstractGraphSONMessageSerializerV4.java |  5 +++--
 .../util/ser/GraphBinaryMessageSerializerV4.java      | 19 ++++++++++++-------
 .../util/ser/binary/ResponseMessageSerializerV4.java  |  3 ++-
 9 files changed, 56 insertions(+), 27 deletions(-)

diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
index 83203f964c..16fc2af3f3 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/GremlinResponseHandler.java
@@ -26,6 +26,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.tinkerpop.gremlin.driver.Result;
 import org.apache.tinkerpop.gremlin.driver.ResultQueue;
 import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.apache.tinkerpop.gremlin.util.message.ResponseMessageV4;
 import org.apache.tinkerpop.gremlin.util.ser.SerializationException;
@@ -68,7 +69,7 @@ public class GremlinResponseHandler extends 
SimpleChannelInboundHandler<Response
         final ResultQueue queue = pending.get();
 
         if ((null == statusCode) || (statusCode == HttpResponseStatus.OK)) {
-            final List<Object> data = response.getResult().getData();
+            final BulkSet<Object> data = response.getResult().getData();
             // unrolls the collection into individual results to be handled by 
the queue.
             data.forEach(item -> queue.add(new Result(item)));
         } else {
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
index d2589e2df7..836ec61a53 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.server;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
 import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptChecker;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import 
org.apache.tinkerpop.gremlin.process.traversal.traverser.util.AbstractTraverser;
 import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler;
 import org.apache.tinkerpop.gremlin.structure.Element;
@@ -30,6 +31,7 @@ import org.apache.tinkerpop.gremlin.util.TokensV4;
 import org.apache.tinkerpop.gremlin.util.message.RequestMessageV4;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -191,13 +193,15 @@ public class Context {
                 : TokensV4.MATERIALIZE_PROPERTIES_ALL;
     }
 
-    public void handleDetachment(final List<Object> aggregate) {
+    public void handleDetachment(final BulkSet<Object> aggregate) {
         if (!aggregate.isEmpty() && 
!this.getMaterializeProperties().equals(TokensV4.MATERIALIZE_PROPERTIES_ALL)) {
             final Object firstElement = aggregate.get(0);
 
             if (firstElement instanceof Element) {
-                for (int i = 0; i < aggregate.size(); i++)
-                    aggregate.set(i, ReferenceFactory.detach((Element) 
aggregate.get(i)));
+                for (Map.Entry<Object, Long> element : 
aggregate.asBulk().entrySet()) {
+                    aggregate.remove(element);
+                    aggregate.add(ReferenceFactory.detach((Element) 
element.getKey()), element.getValue());
+                }
             } else if (firstElement instanceof AbstractTraverser) {
                 for (final Object item : aggregate)
                     ((AbstractTraverser) item).detach();
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
index e3b34379a2..c7557ae70e 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java
@@ -41,6 +41,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Pop;
 import org.apache.tinkerpop.gremlin.process.traversal.Scope;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import 
org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
 import org.apache.tinkerpop.gremlin.server.Context;
 import org.apache.tinkerpop.gremlin.server.GraphManager;
@@ -382,7 +383,7 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
         if (!itty.hasNext()) {
             ByteBuf chunk = null;
             try {
-                chunk = makeChunk(context, msg, serializer, new ArrayList<>(), 
false);
+                chunk = makeChunk(context, msg, serializer, new BulkSet<>(), 
false);
                 nettyContext.writeAndFlush(new DefaultHttpContent(chunk));
             } catch (Exception ex) {
                 // Bytebuf is a countable release - if it does not get written 
downstream
@@ -394,9 +395,10 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
         }
 
         // the batch size can be overridden by the request
-        final int resultIterationBatchSize = (Integer) 
msg.optionalField(TokensV4.ARGS_BATCH_SIZE)
-                .orElse(settings.resultIterationBatchSize);
-        List<Object> aggregate = new ArrayList<>(resultIterationBatchSize);
+//        final int resultIterationBatchSize = (Integer) 
msg.optionalField(TokensV4.ARGS_BATCH_SIZE)
+//                .orElse(settings.resultIterationBatchSize);
+        final int resultIterationBatchSize = 1024;
+        BulkSet<Object> aggregate = new BulkSet<>();
 
         // use an external control to manage the loop as opposed to just 
checking hasNext() in the while.  this
         // prevent situations where auto transactions create a new transaction 
after calls to commit() withing
@@ -458,7 +460,7 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
                     try {
                         // only need to reset the aggregation list if there's 
more stuff to write
                         if (hasMore) {
-                            aggregate = new 
ArrayList<>(resultIterationBatchSize);
+                            aggregate = new BulkSet<>();
                         }
                     } catch (Exception ex) {
                         // Bytebuf is a countable release - if it does not get 
written downstream
@@ -507,7 +509,7 @@ public class HttpGremlinEndpointHandler extends 
SimpleChannelInboundHandler<Requ
     }
 
     private static ByteBuf makeChunk(final Context ctx, final RequestMessageV4 
msg,
-                                     final MessageSerializerV4<?> serializer, 
final List<Object> aggregate,
+                                     final MessageSerializerV4<?> serializer, 
final BulkSet<Object> aggregate,
                                      final boolean hasMore) throws Exception {
         try {
             final ChannelHandlerContext nettyContext = 
ctx.getChannelHandlerContext();
diff --git 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TextPlainMessageSerializerV4.java
 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TextPlainMessageSerializerV4.java
index dadd1eb468..4033fd35d7 100644
--- 
a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TextPlainMessageSerializerV4.java
+++ 
b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/TextPlainMessageSerializerV4.java
@@ -45,13 +45,13 @@ public class TextPlainMessageSerializerV4 extends 
AbstractMessageSerializerV4<Fu
     @Override
     public ByteBuf serializeResponseAsBinary(final ResponseMessageV4 
responseMessage, final ByteBufAllocator allocator) {
         return (responseMessage.getStatus().getCode() == HttpResponseStatus.OK)
-                ? convertStringData(responseMessage.getResult().getData(), 
false, allocator)
+                ? convertStringData(responseMessage.getResult().getListData(), 
false, allocator)
                 : convertErrorString(responseMessage.getStatus().getMessage(), 
allocator);
     }
 
     @Override
     public ByteBuf writeHeader(ResponseMessageV4 responseMessage, 
ByteBufAllocator allocator) {
-        return convertStringData(responseMessage.getResult().getData(), false, 
allocator);
+        return convertStringData(responseMessage.getResult().getListData(), 
false, allocator);
     }
 
     @Override
@@ -61,7 +61,7 @@ public class TextPlainMessageSerializerV4 extends 
AbstractMessageSerializerV4<Fu
 
     @Override
     public ByteBuf writeFooter(ResponseMessageV4 responseMessage, 
ByteBufAllocator allocator) {
-        return convertStringData(responseMessage.getResult().getData(), true, 
allocator);
+        return convertStringData(responseMessage.getResult().getListData(), 
true, allocator);
     }
 
     @Override
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseMessageV4.java
 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseMessageV4.java
index 10f0c0183b..8f0bbc5dd9 100644
--- 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseMessageV4.java
+++ 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseMessageV4.java
@@ -19,6 +19,7 @@
 package org.apache.tinkerpop.gremlin.util.message;
 
 import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -100,7 +101,7 @@ public final class ResponseMessageV4 {
 
     public final static class Builder {
         private HttpResponseStatus code = null;
-        private List<Object> result = Collections.emptyList();
+        private BulkSet<Object> result = new BulkSet<>();
         private String statusMessage = null;
         private String exception = null;
         private Map<String, Object> attributes = Collections.emptyMap();
@@ -129,6 +130,11 @@ public final class ResponseMessageV4 {
         }
 
         public Builder result(final List<Object> result) {
+            result.stream().forEach(res -> this.result.add(res));
+            return this;
+        }
+
+        public Builder result(final BulkSet<Object> result) {
             this.result = result;
             return this;
         }
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseResultV4.java
 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseResultV4.java
index 1711a3c0bc..daa8ec1e86 100644
--- 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseResultV4.java
+++ 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/message/ResponseResultV4.java
@@ -18,6 +18,9 @@
  */
 package org.apache.tinkerpop.gremlin.util.message;
 
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
+
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -25,18 +28,24 @@ import java.util.Map;
  * Data model for the "result" portion of a {@link ResponseMessageV4}.
  */
 public final class ResponseResultV4 {
-    private final List<Object> data;
+    private final BulkSet<Object> data;
     private final Map<String, Object> meta;
 
-    public ResponseResultV4(final List<Object> data, final Map<String, Object> 
meta) {
+    public ResponseResultV4(final BulkSet<Object> data, final Map<String, 
Object> meta) {
         this.data = data;
         this.meta = meta;
     }
 
-    public List<Object> getData() {
+    public BulkSet<Object> getData() {
         return data;
     }
 
+    public List<Object> getListData() {
+        List results = new ArrayList(data.size());
+        data.stream().forEach(res -> results.add(res));
+        return results;
+    }
+
     public Map<String, Object> getMeta() {
         return meta;
     }
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java
 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java
index 4a216dd50e..dec168fcc6 100644
--- 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java
+++ 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/AbstractGraphSONMessageSerializerV4.java
@@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.util.ReferenceCountUtil;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import 
org.apache.tinkerpop.gremlin.structure.io.graphson.AbstractObjectDeserializer;
 import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
@@ -378,7 +379,7 @@ public abstract class AbstractGraphSONMessageSerializerV4 
extends AbstractMessag
             GraphSONUtil.writeEndObject(responseMessage, jsonGenerator, 
typeSerializer);
 
             jsonGenerator.writeFieldName(SerTokensV4.TOKEN_RESULT);
-            final List<Object> result = responseMessage.getResult().getData();
+            final List<Object> result = 
responseMessage.getResult().getListData();
             if (result != null) {
                 serializerProvider.findTypedValueSerializer(result.getClass(), 
true, null).serialize(result, jsonGenerator, serializerProvider);
             } else {
@@ -476,7 +477,7 @@ public abstract class AbstractGraphSONMessageSerializerV4 
extends AbstractMessag
             final Map<String, Object> status = (Map<String, Object>) 
data.get(SerTokensV4.TOKEN_STATUS);
             ResponseMessageV4.Builder response = ResponseMessageV4.build()
                     .code(HttpResponseStatus.valueOf((Integer) 
status.get(SerTokensV4.TOKEN_CODE)))
-                    .result((List) data.get(SerTokensV4.TOKEN_RESULT));
+                    .result((BulkSet<Object>) 
data.get(SerTokensV4.TOKEN_RESULT));
 
             if (null != status.get(SerTokensV4.TOKEN_EXCEPTION)) {
                 
response.exception(String.valueOf(status.get(SerTokensV4.TOKEN_EXCEPTION)));
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV4.java
 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV4.java
index d98c7cfd85..0949965ffd 100644
--- 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV4.java
+++ 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/GraphBinaryMessageSerializerV4.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.util.ser;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.io.Buffer;
 import org.apache.tinkerpop.gremlin.structure.io.IoRegistry;
@@ -232,8 +233,9 @@ public class GraphBinaryMessageSerializerV4 extends 
AbstractMessageSerializerV4<
                         ? responseMessage.getResult().getData()
                         : aggregate;
                 if (data != null) {
-                    for (final Object item : (List) data) {
-                        writer.write(item, buffer);
+                    for (final Map.Entry<Object, Long> item : 
((BulkSet<Object>) data).asBulk().entrySet()) {
+                        writer.write(item.getKey(), buffer);
+                        writer.write(item.getValue(), buffer);
                     }
                 }
             }
@@ -263,14 +265,17 @@ public class GraphBinaryMessageSerializerV4 extends 
AbstractMessageSerializerV4<
         return readChunk(msg, true);
     }
 
-    private List<Object> readPayload(final Buffer buffer) throws IOException {
-        final List<Object> result = new ArrayList<>();
+    private BulkSet<Object> readPayload(final Buffer buffer) throws 
IOException {
+        final BulkSet<Object> result = new BulkSet<>();
         while (buffer.readableBytes() != 0) {
             final Object obj = reader.read(buffer);
+
             if (Marker.END_OF_STREAM.equals(obj)) {
                 break;
             }
-            result.add(obj);
+
+            final Long bulk = reader.read(buffer);
+            result.add(obj, bulk);
         }
         return result;
     }
@@ -290,7 +295,7 @@ public class GraphBinaryMessageSerializerV4 extends 
AbstractMessageSerializerV4<
         try {
             // empty input buffer
             if (buffer.readableBytes() == 0) {
-                return 
ResponseMessageV4.build().result(Collections.emptyList()).create();
+                return ResponseMessageV4.build().result(new 
BulkSet<>()).create();
             }
 
             if (isFirstChunk) {
@@ -303,7 +308,7 @@ public class GraphBinaryMessageSerializerV4 extends 
AbstractMessageSerializerV4<
                 }
             }
 
-            final List<Object> result = readPayload(buffer);
+            final BulkSet<Object> result = readPayload(buffer);
 
             // no footer
             if (buffer.readableBytes() == 0) {
diff --git 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/ResponseMessageSerializerV4.java
 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/ResponseMessageSerializerV4.java
index fd5e600cf0..b6b078cb85 100644
--- 
a/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/ResponseMessageSerializerV4.java
+++ 
b/gremlin-util/src/main/java/org/apache/tinkerpop/gremlin/util/ser/binary/ResponseMessageSerializerV4.java
@@ -20,6 +20,7 @@ package org.apache.tinkerpop.gremlin.util.ser.binary;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import org.apache.tinkerpop.gremlin.structure.io.Buffer;
 import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryReader;
 import org.apache.tinkerpop.gremlin.structure.io.binary.GraphBinaryWriter;
@@ -52,7 +53,7 @@ public class ResponseMessageSerializerV4 {
                     .statusMessage(context.readValue(buffer, String.class, 
true))
                     .statusAttributes(context.readValue(buffer, Map.class, 
false))
                     .responseMetaData(context.readValue(buffer, Map.class, 
false))
-                    .result(context.read(buffer))
+                    .result(context.<BulkSet<Object>>read(buffer))
                     .create();
         } catch (IOException ex) {
             throw new SerializationException(ex);

Reply via email to