[
https://issues.apache.org/jira/browse/ARROW-15703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17496104#comment-17496104
]
Alessandro Molina commented on ARROW-15703:
-------------------------------------------
I think we should defer decision this until we have gathered more feedbacks on
the Cookbook.
Unless there are specific reasons to keep it open I'll close this ticket as
won't fix next week.
> [Java]: Create custom sphinx plugin to help us with java verbose code to
> showcase highlighting code
> ---------------------------------------------------------------------------------------------------
>
> Key: ARROW-15703
> URL: https://issues.apache.org/jira/browse/ARROW-15703
> Project: Apache Arrow
> Issue Type: Sub-task
> Reporter: David Dali Susanibar Arce
> Assignee: David Dali Susanibar Arce
> Priority: Minor
>
> We are running java cookbook code thru sphinx using our custom extension
> [https://github.com/apache/arrow-cookbook/blob/main/java/ext/javadoctest.py]
>
> We need to create another extension to only show our end user the java code
> that is needed to showcase but running the whole java code cookbook at
> testing part.
>
> Current documentation:
> {code:java}
> Validate Delete Data
> ********************
> And confirm that it's been deleted:
> .. testcode::
> import org.apache.arrow.flight.Action;
> import org.apache.arrow.flight.AsyncPutListener;
> import org.apache.arrow.flight.Criteria;
> import org.apache.arrow.flight.FlightClient;
> import org.apache.arrow.flight.FlightDescriptor;
> import org.apache.arrow.flight.FlightEndpoint;
> import org.apache.arrow.flight.FlightInfo;
> import org.apache.arrow.flight.FlightServer;
> import org.apache.arrow.flight.FlightStream;
> import org.apache.arrow.flight.Location;
> import org.apache.arrow.flight.NoOpFlightProducer;
> import org.apache.arrow.flight.PutResult;
> import org.apache.arrow.flight.Result;
> import org.apache.arrow.flight.Ticket;
> import org.apache.arrow.memory.RootAllocator;
> import org.apache.arrow.vector.VarCharVector;
> import org.apache.arrow.vector.VectorSchemaRoot;
> import org.apache.arrow.vector.VectorUnloader;
> import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
> import org.apache.arrow.vector.types.pojo.ArrowType;
> import org.apache.arrow.vector.types.pojo.Field;
> import org.apache.arrow.vector.types.pojo.FieldType;
> import org.apache.arrow.vector.types.pojo.Schema;
> import java.io.IOException;
> import java.nio.charset.StandardCharsets;
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.Collections;
> import java.util.HashMap;
> import java.util.Iterator;
> import java.util.List;
> import java.util.Map;
> class DataInMemory {
> private List<ArrowRecordBatch> listArrowRecordBatch;
> private Schema schema;
> private Long rows;
> public DataInMemory(List<ArrowRecordBatch> listArrowRecordBatch,
> Schema schema, Long rows) {
> this.listArrowRecordBatch = listArrowRecordBatch;
> this.schema = schema;
> this.rows = rows;
> }
> public List<ArrowRecordBatch> getListArrowRecordBatch() {
> return listArrowRecordBatch;
> }
> public Schema getSchema() {
> return schema;
> }
> public Long getRows() {
> return rows;
> }
> }
> // Server
> Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
> Map<FlightDescriptor, DataInMemory> dataInMemory = new HashMap<>();
> Map<String, DataInMemory> mapPojoFlightDataInMemory = new HashMap<>();
> List<ArrowRecordBatch> listArrowRecordBatch = new ArrayList<>();
> try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){
> FlightServer flightServer = FlightServer.builder(allocator, location,
> new NoOpFlightProducer(){
> @Override
> public Runnable acceptPut(CallContext context, FlightStream
> flightStream, StreamListener<PutResult> ackStream) {
> return () -> {
> long rows = 0;
> while (flightStream.next()) {
> VectorUnloader unloader = new
> VectorUnloader(flightStream.getRoot());
> try (final ArrowRecordBatch arb =
> unloader.getRecordBatch()) {
> // Retain data information
> listArrowRecordBatch.add(arb);
> rows = rows +
> flightStream.getRoot().getRowCount();
> }
> }
> long finalRows = rows;
> DataInMemory pojoFlightDataInMemory = new
> DataInMemory(listArrowRecordBatch, flightStream.getSchema(), finalRows);
> dataInMemory.put(flightStream.getDescriptor(),
> pojoFlightDataInMemory);
> ackStream.onCompleted();
> };
> }
> @Override
> public void doAction(CallContext context, Action action,
> StreamListener<Result> listener) {
> FlightDescriptor flightDescriptor = FlightDescriptor.path(new
> String(action.getBody(), StandardCharsets.UTF_8)); // For recover data for
> key configured
> if(dataInMemory.containsKey(flightDescriptor)) {
> switch (action.getType()) {
> case "DELETE":
> dataInMemory.remove(flightDescriptor);
> Result result = new Result("Delete
> completed".getBytes(StandardCharsets.UTF_8));
> listener.onNext(result);
> }
> listener.onCompleted();
> }
> }
> @Override
> public FlightInfo getFlightInfo(CallContext context,
> FlightDescriptor descriptor) {
> if(!dataInMemory.containsKey(descriptor)){
> throw new IllegalStateException("Unknown descriptor.");
> }
> return new FlightInfo(
> dataInMemory.get(descriptor).getSchema(),
> descriptor,
> Collections.singletonList(new FlightEndpoint(new
> Ticket(descriptor.getPath().get(0).getBytes(StandardCharsets.UTF_8)),
> location)), // Configure a key to map back and forward your data using Ticket
> argument
> allocator.getAllocatedMemory(),
> dataInMemory.get(descriptor).getRows()
> );
> }
> @Override
> public void listFlights(CallContext context, Criteria criteria,
> StreamListener<FlightInfo> listener) {
> dataInMemory.forEach((k, v) -> {
> FlightInfo flightInfo = getFlightInfo(null, k);
> listener.onNext(flightInfo);
> }
> );
> listener.onCompleted();
> }
> }).build();
> try {
> flightServer.start();
> } catch (IOException e) {
> e.printStackTrace();
> }
> }
> // Client
> try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){
> // Populate data
> FlightClient flightClient = FlightClient.builder(allocator,
> location).build();
> Schema schema = new Schema(Arrays.asList( new Field("name",
> FieldType.nullable(new ArrowType.Utf8()), null)));
> VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema,
> allocator);
> VarCharVector varCharVector = (VarCharVector)
> vectorSchemaRoot.getVector("name");
> varCharVector.allocateNew(3);
> varCharVector.set(0, "Ronald".getBytes());
> varCharVector.set(1, "David".getBytes());
> varCharVector.set(2, "Francisco".getBytes());
> varCharVector.setValueCount(3);
> vectorSchemaRoot.setRowCount(3);
> FlightClient.ClientStreamListener listener =
> flightClient.startPut(FlightDescriptor.path("profiles"), vectorSchemaRoot,
> new AsyncPutListener());
> listener.putNext();
> vectorSchemaRoot.allocateNew();
> varCharVector.set(0, "Manuel".getBytes());
> varCharVector.set(1, "Felipe".getBytes());
> varCharVector.set(2, "JJ".getBytes());
> varCharVector.setValueCount(3);
> vectorSchemaRoot.setRowCount(3);
> listener.putNext();
> vectorSchemaRoot.clear();
> listener.completed();
> listener.getResult();
> // Do delete action
> Iterator<Result> deleteActionResult = flightClient.doAction(new
> Action("DELETE",
> FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)
> ));
> while(deleteActionResult.hasNext()){
> Result result = deleteActionResult.next();
> System.out.println("Do Delete Action: " + new
> String(result.getBody(), StandardCharsets.UTF_8));
> }
> // Get all metadata information
> Iterable<FlightInfo> flightInfos =
> flightClient.listFlights(Criteria.ALL);
> flightInfos.forEach(t -> System.out.println(t));
> System.out.println("List Flights Info (after delete): No records");
> }
> .. testoutput::
> Do Delete Action: Delete completed
> List Flights Info (after delete): No records {code}
>
> How it could be:
> Only offer to the user the main code but running behind scene all the code
> needed
> {code:java}
> // Server
> @Override
> public void doAction(CallContext context, Action action,
> StreamListener<Result> listener) {
> FlightDescriptor flightDescriptor = FlightDescriptor.path(new
> String(action.getBody(), StandardCharsets.UTF_8)); // For recover data for
> key configured
> if(dataInMemory.containsKey(flightDescriptor)) {
> switch (action.getType()) {
> case "DELETE":
> dataInMemory.remove(flightDescriptor);
> Result result = new Result("Delete
> completed".getBytes(StandardCharsets.UTF_8));
> listener.onNext(result);
> }
> listener.onCompleted();
> }
> }
> // Client
> // Do delete action
> Iterator<Result> deleteActionResult = flightClient.doAction(new
> Action("DELETE",
> FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)
> ));
> while(deleteActionResult.hasNext()){
> Result result = deleteActionResult.next();
> System.out.println("Do Delete Action: " + new String(result.getBody(),
> StandardCharsets.UTF_8));
> }
> // Get all metadata information
> Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL);
> flightInfos.forEach(t -> System.out.println(t));
> System.out.println("List Flights Info (after delete): No records");
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)