davisusanibar commented on issue #315: URL: https://github.com/apache/arrow-cookbook/issues/315#issuecomment-1647995983
Hi @pronzato 13.0.0 version is on [VOTE] Release Apache Arrow 13.0.0 - RC0. Let push it into the next version 14.0.0. >I managed to get it working with your guidance - looks great. @pronzato Could be possible to share your code to load them into a recipe also? --- I am working to create a recipe to support also using JDBC Java module to try to read MySQL data, and then load it into ArrowReader for persistence into a Parquet file. **Parquet files are being created without issues**, but I am encountering errors such as: - Closed with outstanding buffers allocated ... - RefCnt has gone negative ... ```java import org.apache.arrow.adapter.jdbc.ArrowVectorIterator; import org.apache.arrow.adapter.jdbc.JdbcFieldInfo; import org.apache.arrow.adapter.jdbc.JdbcToArrow; import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig; import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder; import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils; import org.apache.arrow.dataset.file.DatasetFileWriter; import org.apache.arrow.dataset.file.FileFormat; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorLoader; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.VectorUnloader; import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.ibatis.jdbc.ScriptRunner; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; import java.util.HashMap; public class WriteArrowObjectsToParquet { public static void main(String[] args) throws Exception { String uri = "file:///Users/dsusanibar/ddsastorage/localarrowrepository/jarfromgithub/tmp/tocookbooks/src/main/resources/write/test3"; try ( final BufferAllocator allocator = new RootAllocator(); final BufferAllocator allocatorJDBC = allocator.newChildAllocator("allocatorJDBC", 0, Long.MAX_VALUE); final BufferAllocator allocatorReader = allocator.newChildAllocator("allocatorReader", 0, Long.MAX_VALUE); final BufferAllocator allocatorParquetWrite = allocator.newChildAllocator("allocatorParquetWrite", 0, Long.MAX_VALUE); final Connection connection = DriverManager.getConnection( "jdbc:h2:mem:h2-jdbc-adapter") ) { ScriptRunner runnerDDLDML = new ScriptRunner(connection); runnerDDLDML.setLogWriter(null); runnerDDLDML.runScript(new BufferedReader( new FileReader("./src/main/resources/h2-ddl.sql"))); runnerDDLDML.runScript(new BufferedReader( new FileReader("./src/main/resources/h2-dml.sql"))); JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocatorJDBC, JdbcToArrowUtils.getUtcCalendar()) .setTargetBatchSize(2) .setArraySubTypeByColumnNameMap( new HashMap() {{ put("LIST_FIELD19", new JdbcFieldInfo(Types.INTEGER)); }} ) .build(); String query = "SELECT int_field1, bool_field2, bigint_field5, char_field16, list_field19 FROM TABLE1"; try ( final ResultSet resultSetConvertToParquet = connection.createStatement().executeQuery(query); final ResultSet resultSetForSchema = connection.createStatement().executeQuery(query); final ArrowVectorIterator arrowVectorIterator = JdbcToArrow.sqlToArrowVectorIterator( resultSetConvertToParquet, config); ) { Schema schema = JdbcToArrow.sqlToArrowVectorIterator(resultSetForSchema, config).next().getSchema(); try ( final JDBCReader arrowReader = new JDBCReader(allocatorReader, arrowVectorIterator, schema) ) { DatasetFileWriter.write(allocatorParquetWrite, arrowReader, FileFormat.PARQUET, uri); } } runnerDDLDML.closeConnection(); } catch (SQLException | IOException e) { e.printStackTrace(); } } } class JDBCReader extends ArrowReader { private final ArrowVectorIterator iter; private final Schema schema; public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter, Schema schema) { super(allocator); this.iter = iter; this.schema = schema; } @Override public boolean loadNextBatch() throws IOException { while (iter.hasNext()) { try (VectorSchemaRoot rootTmp = iter.next()) { if ( rootTmp.getRowCount() > 0 ) { VectorUnloader unloader = new VectorUnloader(rootTmp); VectorLoader loader = new VectorLoader(super.getVectorSchemaRoot()); try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) { loader.load(recordBatch); } return true; } else { return false; } } } return false; } @Override public long bytesRead() { return 0; } @Override protected void closeReadSource() throws IOException { } @Override protected Schema readSchema() { return schema; } } ``` Error messages: ``` Exception in thread "Thread-8" java.lang.IllegalStateException: RefCnt has gone negative ... Suppressed: java.lang.IllegalStateException: Allocator[allocatorParquetWrite] closed with outstanding buffers allocated (12). Allocator(allocatorParquetWrite) 0/17746/51748/9223372036854775807 (res/actual/peak/limit) child allocators: 0 ... Suppressed: java.lang.IllegalStateException: Allocator[allocatorJDBC] closed with outstanding buffers allocated (8). Allocator(allocatorJDBC) 0/49760/99536/9223372036854775807 (res/actual/peak/limit) child allocators: 0 ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org