lidavidm commented on code in PR #4:
URL: https://github.com/apache/arrow-experiments/pull/4#discussion_r1515139530


##########
http/get_simple/java/server/src/main/java/com/example/ArrowHttpServer.java:
##########
@@ -0,0 +1,131 @@
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+import jakarta.servlet.ServletException;
+import jakarta.servlet.http.HttpServlet;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+
+public class ArrowHttpServer extends AbstractHandler {
+
+    static BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+
+    static Schema schema = new Schema(
+        List.of(
+            new Field("a", FieldType.nullable(new ArrowType.Int(64, true)), 
null),
+            new Field("b", FieldType.nullable(new ArrowType.Int(64, true)), 
null),
+            new Field("c", FieldType.nullable(new ArrowType.Int(64, true)), 
null),
+            new Field("d", FieldType.nullable(new ArrowType.Int(64, true)), 
null)
+        ));
+
+    static List<ArrowRecordBatch> batches;
+
+    static Random random = new Random();
+
+    public static List<ArrowRecordBatch> getPutData() {
+        int totalRecords = 100000000;
+        int length = 4096;
+
+        List<ArrowRecordBatch> batches = new ArrayList<>();
+
+        try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, 
allocator)) {
+            
+            String[] names = 
schema.getFields().stream().map(Field::getName).toArray(String[]::new);
+            for (String name : names) {
+                byte[] randomBytes = new byte[length * 8];
+                random.nextBytes(randomBytes);
+
+                byte[] validityBytes = new byte[length / 8];
+                Arrays.fill(validityBytes, (byte) 0xFF);
+
+                BigIntVector vector = (BigIntVector) root.getVector(name);
+                vector.allocateNew(length);
+                vector.setValueCount(length);
+                ArrowBuf dataBuffer = vector.getDataBuffer();
+                dataBuffer.setBytes(0, randomBytes);
+
+                ArrowBuf validityBuffer = vector.getValidityBuffer();
+                validityBuffer.setBytes(0, validityBytes);
+                root.setRowCount(length);
+            }
+
+            int records = 0;
+            int lastLength;
+            while (records < totalRecords) {
+                if (records + length > totalRecords) {
+                    lastLength = totalRecords - records;
+                    try (VectorSchemaRoot slice = root.slice(0, lastLength)) {
+                        VectorUnloader unloader = new VectorUnloader(slice);
+                        ArrowRecordBatch arb = unloader.getRecordBatch();
+                        batches.add(arb);
+                    }
+                    records += lastLength;
+                } else {
+                    VectorUnloader unloader = new VectorUnloader(root);
+                    ArrowRecordBatch arb = unloader.getRecordBatch();
+                    batches.add(arb);
+                    records += length;
+                }
+            }
+        }
+
+        return batches;
+    }
+
+    public void handle(String target, Request baseRequest, HttpServletRequest 
request, HttpServletResponse response)
+            throws IOException, ServletException {
+
+        response.setContentType("application/vnd.apache.arrow.stream");
+        response.setStatus(HttpServletResponse.SC_OK);
+

Review Comment:
   I think effectively in Arrow you have to actually write the data to get the 
final length (since you have to construct the metadata, pad buffers, etc.). For 
this example that's doable since it's fixed data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to