davisusanibar commented on issue #36443:
URL: https://github.com/apache/arrow/issues/36443#issuecomment-1687395890
Hi @hu6360567 sorry to join late,
Could you help me to validate if this is working on your side please?
I'm able to create parquet file with data obtained from the database using
JDBC adapter and then use C Data Interface to read that from python side:
Testing:
```
1. Create jar with dependencies: `mvn clean package`
2. Print log for data read: `python
src/main/java/org/example/consumer/consumerReaderAPI.py 2 True log`
3. Create parquet file: `python
src/main/java/org/example/consumer/consumerReaderAPI.py 2 True parquet`
4. Validate parquet files: `parquet-tools cat jdbc/parquet/part-0.parquet`
```
Java Produces of Arrow Reader:
```java
package org.example.cdata;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.ibatis.jdbc.ScriptRunner;
public class JavaReaderApi {
final static BufferAllocator allocator = new RootAllocator();
public static BufferAllocator getAllocatorForJavaConsumers() {
return allocator;
}
public static ArrowReader getArrowReaderForJavaConsumers(int batchSize,
boolean reuseVSR) {
System.out.println("Java Parameters: BatchSize = " + batchSize + ",
reuseVSR = " + reuseVSR);
String query = "SELECT int_field1, bool_field2, bigint_field5,
char_field16, list_field19 FROM TABLE1";
final Connection connection;
try {
connection =
DriverManager.getConnection("jdbc:h2:mem:h2-jdbc-adapter");
} catch (SQLException e) {
throw new RuntimeException(e);
}
final ScriptRunner runnerDDLDML = new ScriptRunner(connection);
runnerDDLDML.setLogWriter(null);
try {
runnerDDLDML.runScript(new BufferedReader(
new FileReader("./src/main/resources/h2-ddl.sql")));
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
try {
runnerDDLDML.runScript(new BufferedReader(
new FileReader("./src/main/resources/h2-dml.sql")));
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
final JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocator,
JdbcToArrowUtils.getUtcCalendar())
.setTargetBatchSize(batchSize)
.setReuseVectorSchemaRoot(reuseVSR)
.setArraySubTypeByColumnNameMap(
new HashMap() {{
put("LIST_FIELD19",
new JdbcFieldInfo(Types.INTEGER));
}}
)
.build();
final ResultSet resultSetConvertToParquet;
try {
resultSetConvertToParquet =
connection.createStatement().executeQuery(query);
} catch (SQLException e) {
throw new RuntimeException(e);
}
final ArrowVectorIterator arrowVectorIterator;
try {
arrowVectorIterator = JdbcToArrow.sqlToArrowVectorIterator(
resultSetConvertToParquet, config);
} catch (SQLException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
// get jdbc row data as an arrow reader
final ArrowReader arrowReader = new JDBCReader(allocator,
arrowVectorIterator, config);
return arrowReader;
}
}
class JDBCReader extends ArrowReader {
private final ArrowVectorIterator iter;
private final JdbcToArrowConfig config;
private VectorSchemaRoot root;
private boolean firstRoot = true;
public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter,
JdbcToArrowConfig config) {
super(allocator);
this.iter = iter;
this.config = config;
}
@Override
public boolean loadNextBatch() throws IOException {
if (firstRoot) {
firstRoot = false;
return true;
}
else {
if (iter.hasNext()) {
if (root != null && !config.isReuseVectorSchemaRoot()) {
root.close();
}
else {
root.allocateNew();
}
root = iter.next();
return root.getRowCount() != 0;
}
else {
return false;
}
}
}
@Override
public long bytesRead() {
return 0;
}
@Override
protected void closeReadSource() throws IOException {
if (root != null && !config.isReuseVectorSchemaRoot()) {
root.close();
}
}
@Override
protected Schema readSchema() throws IOException {
return null;
}
@Override
public VectorSchemaRoot getVectorSchemaRoot() throws IOException {
if (root == null) {
root = iter.next();
}
return root;
}
}
```
Python Side:
```python
import jpype
import pyarrow as pa
import pyarrow.csv as csv
import pyarrow.dataset as ds
import sys
from pyarrow.cffi import ffi
def getRecordBatchReader(py_stream_ptr):
generator = getIterableRecordBatchReader(py_stream_ptr)
schema = next(generator)
return pa.RecordBatchReader.from_batches(schema, generator)
def getIterableRecordBatchReader(py_stream_ptr):
with pa.RecordBatchReader._import_from_c(py_stream_ptr) as reader:
yield reader.schema
yield from reader
# batchSize = int(sys.argv[1]), reuseVSR = eval(sys.argv[2], log|parquet =
str(sys.argv[3])
jpype.startJVM(classpath=[
"./target/java-python-by-cdata-1.0-SNAPSHOT-jar-with-dependencies.jar"])
java_reader_api = jpype.JClass('org.example.cdata.JavaReaderApi')
java_c_package = jpype.JPackage("org").apache.arrow.c
py_stream = ffi.new("struct ArrowArrayStream*")
py_stream_ptr = int(ffi.cast("uintptr_t", py_stream))
java_wrapped_stream = java_c_package.ArrowArrayStream.wrap(py_stream_ptr)
# get reader data exported into memoryAddress
print('Python Parameters: BatchSize = ' + sys.argv[1] + ', reuseVSR = ' +
sys.argv[2])
java_c_package.Data.exportArrayStream(
java_reader_api.getAllocatorForJavaConsumers(),
java_reader_api.getArrowReaderForJavaConsumers(int(sys.argv[1]),
eval(sys.argv[2])),
java_wrapped_stream)
with getRecordBatchReader(py_stream_ptr) as streamsReaderForJava:
# print logs
if str(sys.argv[3]) == 'log':
for batch in streamsReaderForJava:
print(batch.num_rows)
print(batch.num_columns)
print(batch.to_pylist())
# create parquet file
elif str(sys.argv[3]) == 'parquet':
ds.write_dataset(streamsReaderForJava,
'./jdbc/parquet',
format="parquet")
# create csv file
elif str(sys.argv[3]) == 'csv':
with csv.CSVWriter('./jdbc/csv',
streamsReaderForJava.schema) as writer:
for record_batch in streamsReaderForJava:
writer.write_batch(record_batch)
else:
print('Invalid parameter. Values supported are: {log, parquet, csv}')
```
Java POM
```xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>java-python-by-cdata</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>java-python-by-cdata</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<arrow.version>12.0.0</arrow.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-jdbc</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-dataset</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
<dependency>
<groupId>org.apache.ibatis</groupId>
<artifactId>ibatis-core</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.1.214</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>org.example.cdata.JavaReaderApi</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
```
DML/DDL
```
h2-ddl.sql:
create table TABLE1 (
INT_FIELD1 int,
BOOL_FIELD2 boolean,
TINYINT_FIELD3 smallint,
SMALLINT_FIELD4 smallint,
BIGINT_FIELD5 bigint,
DECIMAL_FIELD6 numeric(14, 3),
DOUBLE_FIELD7 double,
REAL_FIELD8 real,
TIME_FIELD9 time,
DATE_FIELD10 date,
TIMESTAMP_FIELD11 timestamp,
BINARY_FIELD12 blob,
VARCHAR_FIELD13 varchar(256),
BLOB_FIELD14 blob,
CLOB_FIELD15 clob,
CHAR_FIELD16 char(16),
BIT_FIELD17 boolean,
NULL_FIELD18 null,
LIST_FIELD19 int array
);
h2-dml.sql:
INSERT INTO table1 VALUES (101, 1, 45, 12000, 1000000000300.0000001,
17345667789.111, 56478356785.345, 56478356785.345, PARSEDATETIME('12:45:35
GMT', 'HH:mm:ss z'),
PARSEDATETIME('2018-02-12 GMT', 'yyyy-MM-dd z'),
PARSEDATETIME('2018-02-12 12:45:35 GMT', 'yyyy-MM-dd HH:mm:ss z'),
'736f6d6520746578742074686174206e6565647320746f20626520636f6e76657274656420746f2062696e617279',
'some text that needs to be converted to varchar',
'736f6d6520746578742074686174206e6565647320746f20626520636f6e76657274656420746f2062696e617279',
'some text that needs to be converted to clob', 'some char text', 1, null,
ARRAY[1, 2, 3]);
INSERT INTO table1 VALUES (102, 1, 45, 12000, 100000000030.00000011,
17345667789.222, 56478356785.345, 56478356785.345, PARSEDATETIME('12:45:35
GMT', 'HH:mm:ss z'),
PARSEDATETIME('2018-02-12 GMT', 'yyyy-MM-dd z'),
PARSEDATETIME('2018-02-12 12:45:35 GMT', 'yyyy-MM-dd HH:mm:ss z'),
'736f6d6520746578742074686174206e6565647320746f20626520636f6e76657274656420746f2062696e617279',
'some text that needs to be converted to varchar',
'736f6d6520746578742074686174206e6565647320746f20626520636f6e76657274656420746f2062696e617279',
'some text that needs to be converted to clob', 'some char text', 1, null,
ARRAY[1, 2]);
INSERT INTO table1 VALUES (103, 1, 45, 12000, 10000000003.000000111,
17345667789.333, 56478356785.345, 56478356785.345, PARSEDATETIME('12:45:35
GMT', 'HH:mm:ss z'),
PARSEDATETIME('2018-02-12 GMT', 'yyyy-MM-dd z'),
PARSEDATETIME('2018-02-12 12:45:35 GMT', 'yyyy-MM-dd HH:mm:ss z'),
'736f6d6520746578742074686174206e6565647320746f20626520636f6e76657274656420746f2062696e617279',
'some text that needs to be converted to varchar',
'736f6d6520746578742074686174206e6565647320746f20626520636f6e76657274656420746f2062696e617279',
'some text that needs to be converted to clob', 'some char text', 1, null,
ARRAY[1]);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]