[
https://issues.apache.org/jira/browse/ARROW-15703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Dali Susanibar Arce closed ARROW-15703.
---------------------------------------------
Resolution: Later
> [Java]: Create custom sphinx plugin to help us with java verbose code to
> showcase highlighting code
> ---------------------------------------------------------------------------------------------------
>
> Key: ARROW-15703
> URL: https://issues.apache.org/jira/browse/ARROW-15703
> Project: Apache Arrow
> Issue Type: Sub-task
> Reporter: David Dali Susanibar Arce
> Assignee: David Dali Susanibar Arce
> Priority: Minor
>
> We are running java cookbook code thru sphinx using our custom extension
> [https://github.com/apache/arrow-cookbook/blob/main/java/ext/javadoctest.py]
>
> We need to create another extension to only show our end user the java code
> that is needed to showcase but running the whole java code cookbook at
> testing part.
>
> Current documentation:
> {code:java}
> Validate Delete Data
> ********************
> And confirm that it's been deleted:
> .. testcode::
> import org.apache.arrow.flight.Action;
> import org.apache.arrow.flight.AsyncPutListener;
> import org.apache.arrow.flight.Criteria;
> import org.apache.arrow.flight.FlightClient;
> import org.apache.arrow.flight.FlightDescriptor;
> import org.apache.arrow.flight.FlightEndpoint;
> import org.apache.arrow.flight.FlightInfo;
> import org.apache.arrow.flight.FlightServer;
> import org.apache.arrow.flight.FlightStream;
> import org.apache.arrow.flight.Location;
> import org.apache.arrow.flight.NoOpFlightProducer;
> import org.apache.arrow.flight.PutResult;
> import org.apache.arrow.flight.Result;
> import org.apache.arrow.flight.Ticket;
> import org.apache.arrow.memory.RootAllocator;
> import org.apache.arrow.vector.VarCharVector;
> import org.apache.arrow.vector.VectorSchemaRoot;
> import org.apache.arrow.vector.VectorUnloader;
> import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
> import org.apache.arrow.vector.types.pojo.ArrowType;
> import org.apache.arrow.vector.types.pojo.Field;
> import org.apache.arrow.vector.types.pojo.FieldType;
> import org.apache.arrow.vector.types.pojo.Schema;
> import java.io.IOException;
> import java.nio.charset.StandardCharsets;
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.Collections;
> import java.util.HashMap;
> import java.util.Iterator;
> import java.util.List;
> import java.util.Map;
> class DataInMemory {
> private List<ArrowRecordBatch> listArrowRecordBatch;
> private Schema schema;
> private Long rows;
> public DataInMemory(List<ArrowRecordBatch> listArrowRecordBatch,
> Schema schema, Long rows) {
> this.listArrowRecordBatch = listArrowRecordBatch;
> this.schema = schema;
> this.rows = rows;
> }
> public List<ArrowRecordBatch> getListArrowRecordBatch() {
> return listArrowRecordBatch;
> }
> public Schema getSchema() {
> return schema;
> }
> public Long getRows() {
> return rows;
> }
> }
> // Server
> Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
> Map<FlightDescriptor, DataInMemory> dataInMemory = new HashMap<>();
> Map<String, DataInMemory> mapPojoFlightDataInMemory = new HashMap<>();
> List<ArrowRecordBatch> listArrowRecordBatch = new ArrayList<>();
> try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){
> FlightServer flightServer = FlightServer.builder(allocator, location,
> new NoOpFlightProducer(){
> @Override
> public Runnable acceptPut(CallContext context, FlightStream
> flightStream, StreamListener<PutResult> ackStream) {
> return () -> {
> long rows = 0;
> while (flightStream.next()) {
> VectorUnloader unloader = new
> VectorUnloader(flightStream.getRoot());
> try (final ArrowRecordBatch arb =
> unloader.getRecordBatch()) {
> // Retain data information
> listArrowRecordBatch.add(arb);
> rows = rows +
> flightStream.getRoot().getRowCount();
> }
> }
> long finalRows = rows;
> DataInMemory pojoFlightDataInMemory = new
> DataInMemory(listArrowRecordBatch, flightStream.getSchema(), finalRows);
> dataInMemory.put(flightStream.getDescriptor(),
> pojoFlightDataInMemory);
> ackStream.onCompleted();
> };
> }
> @Override
> public void doAction(CallContext context, Action action,
> StreamListener<Result> listener) {
> FlightDescriptor flightDescriptor = FlightDescriptor.path(new
> String(action.getBody(), StandardCharsets.UTF_8)); // For recover data for
> key configured
> if(dataInMemory.containsKey(flightDescriptor)) {
> switch (action.getType()) {
> case "DELETE":
> dataInMemory.remove(flightDescriptor);
> Result result = new Result("Delete
> completed".getBytes(StandardCharsets.UTF_8));
> listener.onNext(result);
> }
> listener.onCompleted();
> }
> }
> @Override
> public FlightInfo getFlightInfo(CallContext context,
> FlightDescriptor descriptor) {
> if(!dataInMemory.containsKey(descriptor)){
> throw new IllegalStateException("Unknown descriptor.");
> }
> return new FlightInfo(
> dataInMemory.get(descriptor).getSchema(),
> descriptor,
> Collections.singletonList(new FlightEndpoint(new
> Ticket(descriptor.getPath().get(0).getBytes(StandardCharsets.UTF_8)),
> location)), // Configure a key to map back and forward your data using Ticket
> argument
> allocator.getAllocatedMemory(),
> dataInMemory.get(descriptor).getRows()
> );
> }
> @Override
> public void listFlights(CallContext context, Criteria criteria,
> StreamListener<FlightInfo> listener) {
> dataInMemory.forEach((k, v) -> {
> FlightInfo flightInfo = getFlightInfo(null, k);
> listener.onNext(flightInfo);
> }
> );
> listener.onCompleted();
> }
> }).build();
> try {
> flightServer.start();
> } catch (IOException e) {
> e.printStackTrace();
> }
> }
> // Client
> try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){
> // Populate data
> FlightClient flightClient = FlightClient.builder(allocator,
> location).build();
> Schema schema = new Schema(Arrays.asList( new Field("name",
> FieldType.nullable(new ArrowType.Utf8()), null)));
> VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema,
> allocator);
> VarCharVector varCharVector = (VarCharVector)
> vectorSchemaRoot.getVector("name");
> varCharVector.allocateNew(3);
> varCharVector.set(0, "Ronald".getBytes());
> varCharVector.set(1, "David".getBytes());
> varCharVector.set(2, "Francisco".getBytes());
> varCharVector.setValueCount(3);
> vectorSchemaRoot.setRowCount(3);
> FlightClient.ClientStreamListener listener =
> flightClient.startPut(FlightDescriptor.path("profiles"), vectorSchemaRoot,
> new AsyncPutListener());
> listener.putNext();
> vectorSchemaRoot.allocateNew();
> varCharVector.set(0, "Manuel".getBytes());
> varCharVector.set(1, "Felipe".getBytes());
> varCharVector.set(2, "JJ".getBytes());
> varCharVector.setValueCount(3);
> vectorSchemaRoot.setRowCount(3);
> listener.putNext();
> vectorSchemaRoot.clear();
> listener.completed();
> listener.getResult();
> // Do delete action
> Iterator<Result> deleteActionResult = flightClient.doAction(new
> Action("DELETE",
> FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)
> ));
> while(deleteActionResult.hasNext()){
> Result result = deleteActionResult.next();
> System.out.println("Do Delete Action: " + new
> String(result.getBody(), StandardCharsets.UTF_8));
> }
> // Get all metadata information
> Iterable<FlightInfo> flightInfos =
> flightClient.listFlights(Criteria.ALL);
> flightInfos.forEach(t -> System.out.println(t));
> System.out.println("List Flights Info (after delete): No records");
> }
> .. testoutput::
> Do Delete Action: Delete completed
> List Flights Info (after delete): No records {code}
>
> How it could be:
> Only offer to the user the main code but running behind scene all the code
> needed
> {code:java}
> // Server
> @Override
> public void doAction(CallContext context, Action action,
> StreamListener<Result> listener) {
> FlightDescriptor flightDescriptor = FlightDescriptor.path(new
> String(action.getBody(), StandardCharsets.UTF_8)); // For recover data for
> key configured
> if(dataInMemory.containsKey(flightDescriptor)) {
> switch (action.getType()) {
> case "DELETE":
> dataInMemory.remove(flightDescriptor);
> Result result = new Result("Delete
> completed".getBytes(StandardCharsets.UTF_8));
> listener.onNext(result);
> }
> listener.onCompleted();
> }
> }
> // Client
> // Do delete action
> Iterator<Result> deleteActionResult = flightClient.doAction(new
> Action("DELETE",
> FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)
> ));
> while(deleteActionResult.hasNext()){
> Result result = deleteActionResult.next();
> System.out.println("Do Delete Action: " + new String(result.getBody(),
> StandardCharsets.UTF_8));
> }
> // Get all metadata information
> Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL);
> flightInfos.forEach(t -> System.out.println(t));
> System.out.println("List Flights Info (after delete): No records");
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)