David Dali Susanibar Arce created ARROW-15703:
-------------------------------------------------
Summary: [Java]: Create custom sphinx plugin to help us with java
verbose code to showcase highlighting code
Key: ARROW-15703
URL: https://issues.apache.org/jira/browse/ARROW-15703
Project: Apache Arrow
Issue Type: Sub-task
Reporter: David Dali Susanibar Arce
Assignee: David Dali Susanibar Arce
We are running java cookbook code thru sphinx using our custom extension
[https://github.com/apache/arrow-cookbook/blob/main/java/ext/javadoctest.py]
We need to create another extension to only show our end user the java code
that is needed to showcase but running the whole java code cookbook at testing
part.
Current documentation:
{code:java}
Validate Delete Data
********************
And confirm that it's been deleted:
.. testcode::
import org.apache.arrow.flight.Action;
import org.apache.arrow.flight.AsyncPutListener;
import org.apache.arrow.flight.Criteria;
import org.apache.arrow.flight.FlightClient;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightServer;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.NoOpFlightProducer;
import org.apache.arrow.flight.PutResult;
import org.apache.arrow.flight.Result;
import org.apache.arrow.flight.Ticket;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
class DataInMemory {
private List<ArrowRecordBatch> listArrowRecordBatch;
private Schema schema;
private Long rows;
public DataInMemory(List<ArrowRecordBatch> listArrowRecordBatch, Schema
schema, Long rows) {
this.listArrowRecordBatch = listArrowRecordBatch;
this.schema = schema;
this.rows = rows;
}
public List<ArrowRecordBatch> getListArrowRecordBatch() {
return listArrowRecordBatch;
}
public Schema getSchema() {
return schema;
}
public Long getRows() {
return rows;
}
}
// Server
Location location = Location.forGrpcInsecure("0.0.0.0", 33333);
Map<FlightDescriptor, DataInMemory> dataInMemory = new HashMap<>();
Map<String, DataInMemory> mapPojoFlightDataInMemory = new HashMap<>();
List<ArrowRecordBatch> listArrowRecordBatch = new ArrayList<>();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){
FlightServer flightServer = FlightServer.builder(allocator, location,
new NoOpFlightProducer(){
@Override
public Runnable acceptPut(CallContext context, FlightStream
flightStream, StreamListener<PutResult> ackStream) {
return () -> {
long rows = 0;
while (flightStream.next()) {
VectorUnloader unloader = new
VectorUnloader(flightStream.getRoot());
try (final ArrowRecordBatch arb =
unloader.getRecordBatch()) {
// Retain data information
listArrowRecordBatch.add(arb);
rows = rows + flightStream.getRoot().getRowCount();
}
}
long finalRows = rows;
DataInMemory pojoFlightDataInMemory = new
DataInMemory(listArrowRecordBatch, flightStream.getSchema(), finalRows);
dataInMemory.put(flightStream.getDescriptor(),
pojoFlightDataInMemory);
ackStream.onCompleted();
};
}
@Override
public void doAction(CallContext context, Action action,
StreamListener<Result> listener) {
FlightDescriptor flightDescriptor = FlightDescriptor.path(new
String(action.getBody(), StandardCharsets.UTF_8)); // For recover data for key
configured
if(dataInMemory.containsKey(flightDescriptor)) {
switch (action.getType()) {
case "DELETE":
dataInMemory.remove(flightDescriptor);
Result result = new Result("Delete
completed".getBytes(StandardCharsets.UTF_8));
listener.onNext(result);
}
listener.onCompleted();
}
}
@Override
public FlightInfo getFlightInfo(CallContext context,
FlightDescriptor descriptor) {
if(!dataInMemory.containsKey(descriptor)){
throw new IllegalStateException("Unknown descriptor.");
}
return new FlightInfo(
dataInMemory.get(descriptor).getSchema(),
descriptor,
Collections.singletonList(new FlightEndpoint(new
Ticket(descriptor.getPath().get(0).getBytes(StandardCharsets.UTF_8)),
location)), // Configure a key to map back and forward your data using Ticket
argument
allocator.getAllocatedMemory(),
dataInMemory.get(descriptor).getRows()
);
}
@Override
public void listFlights(CallContext context, Criteria criteria,
StreamListener<FlightInfo> listener) {
dataInMemory.forEach((k, v) -> {
FlightInfo flightInfo = getFlightInfo(null, k);
listener.onNext(flightInfo);
}
);
listener.onCompleted();
}
}).build();
try {
flightServer.start();
} catch (IOException e) {
e.printStackTrace();
}
}
// Client
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)){
// Populate data
FlightClient flightClient = FlightClient.builder(allocator,
location).build();
Schema schema = new Schema(Arrays.asList( new Field("name",
FieldType.nullable(new ArrowType.Utf8()), null)));
VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(schema,
allocator);
VarCharVector varCharVector = (VarCharVector)
vectorSchemaRoot.getVector("name");
varCharVector.allocateNew(3);
varCharVector.set(0, "Ronald".getBytes());
varCharVector.set(1, "David".getBytes());
varCharVector.set(2, "Francisco".getBytes());
varCharVector.setValueCount(3);
vectorSchemaRoot.setRowCount(3);
FlightClient.ClientStreamListener listener =
flightClient.startPut(FlightDescriptor.path("profiles"), vectorSchemaRoot, new
AsyncPutListener());
listener.putNext();
vectorSchemaRoot.allocateNew();
varCharVector.set(0, "Manuel".getBytes());
varCharVector.set(1, "Felipe".getBytes());
varCharVector.set(2, "JJ".getBytes());
varCharVector.setValueCount(3);
vectorSchemaRoot.setRowCount(3);
listener.putNext();
vectorSchemaRoot.clear();
listener.completed();
listener.getResult();
// Do delete action
Iterator<Result> deleteActionResult = flightClient.doAction(new
Action("DELETE",
FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)
));
while(deleteActionResult.hasNext()){
Result result = deleteActionResult.next();
System.out.println("Do Delete Action: " + new
String(result.getBody(), StandardCharsets.UTF_8));
}
// Get all metadata information
Iterable<FlightInfo> flightInfos =
flightClient.listFlights(Criteria.ALL);
flightInfos.forEach(t -> System.out.println(t));
System.out.println("List Flights Info (after delete): No records");
}
.. testoutput::
Do Delete Action: Delete completed
List Flights Info (after delete): No records {code}
How it could be:
Only offer to the user the main code but running behind scene all the code
needed
{code:java}
// Server
@Override
public void doAction(CallContext context, Action action, StreamListener<Result>
listener) {
FlightDescriptor flightDescriptor = FlightDescriptor.path(new
String(action.getBody(), StandardCharsets.UTF_8)); // For recover data for key
configured
if(dataInMemory.containsKey(flightDescriptor)) {
switch (action.getType()) {
case "DELETE":
dataInMemory.remove(flightDescriptor);
Result result = new Result("Delete
completed".getBytes(StandardCharsets.UTF_8));
listener.onNext(result);
}
listener.onCompleted();
}
}
// Client
// Do delete action
Iterator<Result> deleteActionResult = flightClient.doAction(new
Action("DELETE",
FlightDescriptor.path("profiles").getPath().get(0).getBytes(StandardCharsets.UTF_8)
));
while(deleteActionResult.hasNext()){
Result result = deleteActionResult.next();
System.out.println("Do Delete Action: " + new String(result.getBody(),
StandardCharsets.UTF_8));
}
// Get all metadata information
Iterable<FlightInfo> flightInfos = flightClient.listFlights(Criteria.ALL);
flightInfos.forEach(t -> System.out.println(t));
System.out.println("List Flights Info (after delete): No records");
{code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)