[
https://issues.apache.org/jira/browse/ARROW-5658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16873060#comment-16873060
]
luckily commented on ARROW-5658:
--------------------------------
Hi LiyaFan
I need to do an asynchronous process to prevent the IO thread from blocking. In
many cases, the row data is not immediately processed, placed in an
asynchronous queue, but cannot be placed directly. It is necessary to create a
same container, if there is such an api, you can copy Root once, Create a same
data
> [JAVA] apache arrow-flight cannot send listvector
> --------------------------------------------------
>
> Key: ARROW-5658
> URL: https://issues.apache.org/jira/browse/ARROW-5658
> Project: Apache Arrow
> Issue Type: Bug
> Components: FlightRPC, Java
> Affects Versions: 0.13.0
> Environment: java8 arrow-java 0.13.0
> Reporter: luckily
> Assignee: Liya Fan
> Priority: Major
> Labels: pull-request-available
> Attachments: ClientStart.java, ClientStart.java, ServerStart.java,
> image-2019-06-25-17-58-09-038.png, image-2019-06-25-17-59-07-352.png,
> image-2019-06-26-16-07-23-366.png, image-2019-06-26-16-09-30-275.png, pom.xml
>
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> I can't transfer using apache arrow-flihgt. Contains listvector data. The
> problem description is as follows:
> {quote} # I parse an xml file and convert it to an arrow format and finally
> convert it to a parquet data format. The address of the .xml file data is url
> [http://www.w3school.com.cn/example/xmle/cd_catalog.xml|http://www.w3school.com.cn/example/xmle/cd_catalog.xml)]
> # I created a schema that uses listvector.
> code show as below:
> List<FiledVector> list =
> childrenBuilder.add(ListVector.empty(column.getId().toString(),allocator));
> VectorSchemaRoot root = VectorSchemaRoot.of(inVector)
> # Parse the xml file to get the list data in "cd". Use api use listvector.
> `ListVector listVector = (ListVector) valueVectors;
> List<Column> columns = column.getColumns();
> Column column1 = columns.get(0);
> String name = column1.getId().toString();
> UnionListWriter writer = listVector.getWriter();
> Writer.allocate();
> For (int j = 0; j < column1.getColumns().size();j++) {
> writer.setPosition(j);
> writer.startList();
> Writer.list().startList();
> Column column2 = column1.getColumns().get(j);
> List<Map<String, String>> lst = (List<Map<String,
> String>>) ((Map) val).get(name);
> For (int k = 0; k < lst.size(); k++) {
> Map<String, String> stringStringMap = lst.get(k);
> String value =
> stringStringMap.get(column2.getId().toString());
> Switch (column2.getType()) {
> Case FLOAT:
>
> Writer.list().float4().writeFloat4(stringConvertFloat(value));
> Break;
> Case BOOLEAN:
>
> Writer.list().bit().writeBit(stringConvertBoolean(value));
> Break;
> Case DECIMAL:
>
> Writer.list().decimal().writeDecimal(stringConvertDecimal(value,column2.getScale()));
> Break;
> Case TIMESTAMP:
>
> Writer.list().dateMilli().writeDateMilli(stringConvertTimestamp(value,column2.format.toString()));
> Break;
> Case INTEGER:
> Case BIGINT:
>
> Writer.list().bigInt().writeBigInt(stringConvertLong(value));
> Break;
> Case VARCHAR:
> VarCharHolder varBinaryHolder = new
> VarCharHolder();
> varBinaryHolder.start = 0;
> Byte[] bytes =value.getBytes();
> ArrowBuf buffer =
> listVector.getAllocator().buffer(bytes.length);
> varBinaryHolder.buffer = buffer;
> buffer.writeBytes(bytes);
> varBinaryHolder.end=bytes.length;
>
> Writer.list().varChar().write(varBinaryHolder);
> Break;
> Default:
> Throw new IllegalArgumentException(" error no
> type !!");
> }
> }
> Writer.list().endList();
> writer.endList();
> }`
> 4.
> After the write is complete, I will send to the arrow-flight server. server
> code :
> {quote}
> {quote}@Override
> public Callable<Flight.PutResult> acceptPut(FlightStream flightStream) {
> return () -> {
> try (VectorSchemaRoot root = flightStream.getRoot()) {
> while (flightStream.next()) {
> VectorSchemaRoot other = null;
> try {
> logger.info(" Receive message ...... size: " + root.getRowCount());
> other = copyRoot(root);
> ArrowMessage arrowMessage = new ArrowMessage(other, other.getSchema());
> spmc.offer(arrowMessage);
> } catch (Exception e) {
> logger.error(e.getMessage(), e);
> }
> }
> }
> return Flight.PutResult.parseFrom("ok".getBytes());
> };
> }{quote}
> {quote} But the server did not receive any information.!! it is error .{quote}
> {quote}client code :{quote}
> {quote}root = message.getRoot();
> //client.close();
> FlightClient.ClientStreamListener listener =
> client.startPut(FlightDescriptor.path(message.getFilename()), root);
> listener.putNext();
> listener.completed();
> client.close();
> listener.putNext();
> listener.completed();
> Flight.PutResult result =
> listener.getResult();
> String s = new String(result.toByteArray());
> System.out.println(s);{quote}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)