davisusanibar commented on issue #315:
URL: https://github.com/apache/arrow-cookbook/issues/315#issuecomment-1647995983
Hi @pronzato 13.0.0 version is on [VOTE] Release Apache Arrow 13.0.0 - RC0.
Let push it into the next version 14.0.0.
>I managed to get it working with your guidance - looks
great.
@pronzato Could be possible to share your code to load them into a recipe
also?
---
I am working to create a recipe to support also using JDBC Java module to
try to read MySQL data, and then load it into ArrowReader for persistence into
a Parquet file.
**Parquet files are being created without issues**, but I am encountering
errors such as:
- Closed with outstanding buffers allocated ...
- RefCnt has gone negative ...
```java
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.dataset.file.DatasetFileWriter;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.ibatis.jdbc.ScriptRunner;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;
public class WriteArrowObjectsToParquet {
public static void main(String[] args) throws Exception {
String uri =
"file:///Users/dsusanibar/ddsastorage/localarrowrepository/jarfromgithub/tmp/tocookbooks/src/main/resources/write/test3";
try (
final BufferAllocator allocator = new RootAllocator();
final BufferAllocator allocatorJDBC =
allocator.newChildAllocator("allocatorJDBC", 0, Long.MAX_VALUE);
final BufferAllocator allocatorReader =
allocator.newChildAllocator("allocatorReader", 0, Long.MAX_VALUE);
final BufferAllocator allocatorParquetWrite =
allocator.newChildAllocator("allocatorParquetWrite", 0, Long.MAX_VALUE);
final Connection connection = DriverManager.getConnection(
"jdbc:h2:mem:h2-jdbc-adapter")
) {
ScriptRunner runnerDDLDML = new ScriptRunner(connection);
runnerDDLDML.setLogWriter(null);
runnerDDLDML.runScript(new BufferedReader(
new FileReader("./src/main/resources/h2-ddl.sql")));
runnerDDLDML.runScript(new BufferedReader(
new FileReader("./src/main/resources/h2-dml.sql")));
JdbcToArrowConfig config = new
JdbcToArrowConfigBuilder(allocatorJDBC,
JdbcToArrowUtils.getUtcCalendar())
.setTargetBatchSize(2)
.setArraySubTypeByColumnNameMap(
new HashMap() {{
put("LIST_FIELD19",
new JdbcFieldInfo(Types.INTEGER));
}}
)
.build();
String query = "SELECT int_field1, bool_field2, bigint_field5,
char_field16, list_field19 FROM TABLE1";
try (
final ResultSet resultSetConvertToParquet =
connection.createStatement().executeQuery(query);
final ResultSet resultSetForSchema =
connection.createStatement().executeQuery(query);
final ArrowVectorIterator arrowVectorIterator =
JdbcToArrow.sqlToArrowVectorIterator(
resultSetConvertToParquet, config);
) {
Schema schema =
JdbcToArrow.sqlToArrowVectorIterator(resultSetForSchema,
config).next().getSchema();
try (
final JDBCReader arrowReader = new
JDBCReader(allocatorReader, arrowVectorIterator, schema)
) {
DatasetFileWriter.write(allocatorParquetWrite,
arrowReader, FileFormat.PARQUET, uri);
}
}
runnerDDLDML.closeConnection();
} catch (SQLException | IOException e) {
e.printStackTrace();
}
}
}
class JDBCReader extends ArrowReader {
private final ArrowVectorIterator iter;
private final Schema schema;
public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter,
Schema schema) {
super(allocator);
this.iter = iter;
this.schema = schema;
}
@Override
public boolean loadNextBatch() throws IOException {
while (iter.hasNext()) {
try (VectorSchemaRoot rootTmp = iter.next()) {
if ( rootTmp.getRowCount() > 0 ) {
VectorUnloader unloader = new VectorUnloader(rootTmp);
VectorLoader loader = new
VectorLoader(super.getVectorSchemaRoot());
try (ArrowRecordBatch recordBatch =
unloader.getRecordBatch()) {
loader.load(recordBatch);
}
return true;
} else {
return false;
}
}
}
return false;
}
@Override
public long bytesRead() {
return 0;
}
@Override
protected void closeReadSource() throws IOException {
}
@Override
protected Schema readSchema() {
return schema;
}
}
```
Error messages:
```
Exception in thread "Thread-8" java.lang.IllegalStateException: RefCnt has
gone negative
...
Suppressed: java.lang.IllegalStateException:
Allocator[allocatorParquetWrite] closed with outstanding buffers allocated (12).
Allocator(allocatorParquetWrite) 0/17746/51748/9223372036854775807
(res/actual/peak/limit)
child allocators: 0
...
Suppressed: java.lang.IllegalStateException: Allocator[allocatorJDBC] closed
with outstanding buffers allocated (8).
Allocator(allocatorJDBC) 0/49760/99536/9223372036854775807
(res/actual/peak/limit)
child allocators: 0
...
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]