[ 
https://issues.apache.org/jira/browse/ARROW-15703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Dali Susanibar Arce closed ARROW-15703.
---------------------------------------------
    Resolution: Later

> [Java]: Create custom sphinx plugin to help us with java verbose code to 
> showcase highlighting code
> ---------------------------------------------------------------------------------------------------
>
>                 Key: ARROW-15703
>                 URL: https://issues.apache.org/jira/browse/ARROW-15703
>             Project: Apache Arrow
>          Issue Type: Sub-task
>            Reporter: David Dali Susanibar Arce
>            Assignee: David Dali Susanibar Arce
>            Priority: Minor
>
> We are running java cookbook code thru sphinx using our custom extension 
> [https://github.com/apache/arrow-cookbook/blob/main/java/ext/javadoctest.py] 
>  
> We need to create another extension to only show our end user the java code 
> that is needed to showcase but running the whole java code cookbook at 
> testing part.
>  
> Current documentation:
> {code:java}
> Validate Delete Data
> ********************
> And confirm that it's been deleted:
> .. testcode::
>     import org.apache.arrow.flight.Action;
>     import org.apache.arrow.flight.AsyncPutListener;
>     import org.apache.arrow.flight.Criteria;
>     import org.apache.arrow.flight.FlightClient;
>     import org.apache.arrow.flight.FlightDescriptor;
>     import org.apache.arrow.flight.FlightEndpoint;
>     import org.apache.arrow.flight.FlightInfo;
>     import org.apache.arrow.flight.FlightServer;
>     import org.apache.arrow.flight.FlightStream;
>     import org.apache.arrow.flight.Location;
>     import org.apache.arrow.flight.NoOpFlightProducer;
>     import org.apache.arrow.flight.PutResult;
>     import org.apache.arrow.flight.Result;
>     import org.apache.arrow.flight.Ticket;
>     import org.apache.arrow.memory.RootAllocator;
>     import org.apache.arrow.vector.VarCharVector;
>     import org.apache.arrow.vector.VectorSchemaRoot;
>     import org.apache.arrow.vector.VectorUnloader;
>     import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
>     import org.apache.arrow.vector.types.pojo.ArrowType;
>     import org.apache.arrow.vector.types.pojo.Field;
>     import org.apache.arrow.vector.types.pojo.FieldType;
>     import org.apache.arrow.vector.types.pojo.Schema;
>     import java.io.IOException;
>     import java.nio.charset.StandardCharsets;
>     import java.util.ArrayList;
>     import java.util.Arrays;
>     import java.util.Collections;
>     import java.util.HashMap;
>     import java.util.Iterator;
>     import java.util.List;
>     import java.util.Map;
>     class DataInMemory {
>         private List<ArrowRecordBatch> listArrowRecordBatch;
>         private Schema schema;
>         private Long rows;
>         public DataInMemory(List<ArrowRecordBatch> listArrowRecordBatch, 
> Schema schema, Long rows) {
>             this.listArrowRecordBatch = listArrowRecordBatch;
>             this.schema = schema;
>             this.rows = rows;
>         }
>         public List<ArrowRecordBatch> getListArrowRecordBatch() {
>             return listArrowRecordBatch;
>         }
>         public Schema getSchema() {
>             return schema;
>         }
>         public Long getRows() {
>             return rows;
>         }
>     }
>     // Server
>     Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
>     Map<FlightDescriptor, DataInMemory> dataInMemory = new HashMap<>();
>     Map<String, DataInMemory> mapPojoFlightDataInMemory = new HashMap<>();
>     List<ArrowRecordBatch> listArrowRecordBatch = new ArrayList<>();
>     try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){
>         FlightServer flightServer = FlightServer.builder(allocator, location, 
> new NoOpFlightProducer(){
>             @Override
>             public Runnable acceptPut(CallContext context, FlightStream 
> flightStream, StreamListener<PutResult> ackStream) {
>                 return () -> {
>                     long rows = 0;
>                     while (flightStream.next()) {
>                         VectorUnloader unloader = new 
> VectorUnloader(flightStream.getRoot());
>                         try (final ArrowRecordBatch arb = 
> unloader.getRecordBatch()) {
>                             // Retain data information
>                             listArrowRecordBatch.add(arb);
>                             rows = rows + 
> flightStream.getRoot().getRowCount();
>                         }
>                     }
>                     long finalRows = rows;
>                     DataInMemory pojoFlightDataInMemory = new 
> DataInMemory(listArrowRecordBatch, flightStream.getSchema(), finalRows);
>                     dataInMemory.put(flightStream.getDescriptor(), 
> pojoFlightDataInMemory);
>                     ackStream.onCompleted();
>                 };
>             }
>             @Override
>             public void doAction(CallContext context, Action action, 
> StreamListener<Result> listener) {
>                 FlightDescriptor flightDescriptor = FlightDescriptor.path(new 
> String(action.getBody(), StandardCharsets.UTF_8)); // For recover data for 
> key configured
>                 if(dataInMemory.containsKey(flightDescriptor)) {
>                     switch (action.getType()) {
>                         case "DELETE":
>                             dataInMemory.remove(flightDescriptor);
>                             Result result = new Result("Delete 
> completed".getBytes(StandardCharsets.UTF_8));
>                             listener.onNext(result);
>                     }
>                     listener.onCompleted();
>                 }
>             }
>             @Override
>             public FlightInfo getFlightInfo(CallContext context, 
> FlightDescriptor descriptor) {
>                 if(!dataInMemory.containsKey(descriptor)){
>                     throw new IllegalStateException("Unknown descriptor.");
>                 }
>                 return new FlightInfo(
>                         dataInMemory.get(descriptor).getSchema(),
>                         descriptor,
>                         Collections.singletonList(new FlightEndpoint(new 
> Ticket(descriptor.getPath().get(0).getBytes(StandardCharsets.UTF_8)), 
> location)), // Configure a key to map back and forward your data using Ticket 
> argument
>                         allocator.getAllocatedMemory(),
>                         dataInMemory.get(descriptor).getRows()
>                 );
>             }
>             @Override
>             public void listFlights(CallContext context, Criteria criteria, 
> StreamListener<FlightInfo> listener) {
>                 dataInMemory.forEach((k, v) -> {
>                     FlightInfo flightInfo = getFlightInfo(null, k);
>                     listener.onNext(flightInfo);
>                     }
>                 );
>                 listener.onCompleted();
>             }
>         }).build();
>         try {
>             flightServer.start();
>         } catch (IOException e) {
>             e.printStackTrace();
>         }
>     }
>     // Client
>     try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){
>         // Populate data
>         FlightClient flightClient = FlightClient.builder(allocator, 
> location).build();
>         Schema schema = new Schema(Arrays.asList( new Field("name", 
> FieldType.nullable(new ArrowType.Utf8()), null)));
>         VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema, 
> allocator);
>         VarCharVector varCharVector = (VarCharVector) 
> vectorSchemaRoot.getVector("name");
>         varCharVector.allocateNew(3);
>         varCharVector.set(0, "Ronald".getBytes());
>         varCharVector.set(1, "David".getBytes());
>         varCharVector.set(2, "Francisco".getBytes());
>         varCharVector.setValueCount(3);
>         vectorSchemaRoot.setRowCount(3);
>         FlightClient.ClientStreamListener listener = 
> flightClient.startPut(FlightDescriptor.path("profiles"), vectorSchemaRoot, 
> new AsyncPutListener());
>         listener.putNext();
>         vectorSchemaRoot.allocateNew();
>         varCharVector.set(0, "Manuel".getBytes());
>         varCharVector.set(1, "Felipe".getBytes());
>         varCharVector.set(2, "JJ".getBytes());
>         varCharVector.setValueCount(3);
>         vectorSchemaRoot.setRowCount(3);
>         listener.putNext();
>         vectorSchemaRoot.clear();
>         listener.completed();
>         listener.getResult();
>         // Do delete action
>         Iterator<Result> deleteActionResult = flightClient.doAction(new 
> Action("DELETE", 
> FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)
>  ));
>         while(deleteActionResult.hasNext()){
>             Result result = deleteActionResult.next();
>             System.out.println("Do Delete Action: " + new 
> String(result.getBody(), StandardCharsets.UTF_8));
>         }
>         // Get all metadata information
>         Iterable<FlightInfo> flightInfos = 
> flightClient.listFlights(Criteria.ALL);
>         flightInfos.forEach(t -> System.out.println(t));
>         System.out.println("List Flights Info (after delete): No records");
>     }
> .. testoutput::
>     Do Delete Action: Delete completed
>     List Flights Info (after delete): No records {code}
>  
> How it could be:
> Only offer to the user the main code but running behind scene all the code 
> needed
> {code:java}
> // Server
> @Override
> public void doAction(CallContext context, Action action, 
> StreamListener<Result> listener) {
>     FlightDescriptor flightDescriptor = FlightDescriptor.path(new 
> String(action.getBody(), StandardCharsets.UTF_8)); // For recover data for 
> key configured
>     if(dataInMemory.containsKey(flightDescriptor)) {
>         switch (action.getType()) {
>             case "DELETE":
>                 dataInMemory.remove(flightDescriptor);
>                 Result result = new Result("Delete 
> completed".getBytes(StandardCharsets.UTF_8));
>                 listener.onNext(result);
>         }
>         listener.onCompleted();
>     }
> }
> // Client
> // Do delete action
> Iterator<Result> deleteActionResult = flightClient.doAction(new 
> Action("DELETE", 
> FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)
>  ));
> while(deleteActionResult.hasNext()){
>     Result result = deleteActionResult.next();
>     System.out.println("Do Delete Action: " + new String(result.getBody(), 
> StandardCharsets.UTF_8));
> }
> // Get all metadata information
> Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL);
> flightInfos.forEach(t -> System.out.println(t));
> System.out.println("List Flights Info (after delete): No records");
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to