pronzato commented on issue #315:
URL: https://github.com/apache/arrow-cookbook/issues/315#issuecomment-1648545016

   Hi David,
   
   I'm not super familiar with the memory-management side yet but this is the
   very simple POC I cobbled together (probably missed some memory-management
   steps but I was able to convert a ResultSet into a number of Parquet files
   with the ResultSet row count matching the number of records in the
   resulting Parquet files).
   
   If I remember rightly, the main issue was the Schema was taken from the 1st
   VectorSchemaRoot which would move the iterator so I had to play around with
   that a little to get it working correctly.
   
   Again, this was just a simple POC so I'd be really interested in seeing
   your final version.
   
   public static void write(BufferAllocator allocator, ResultSet rs,
   FileFormat fileFormat, String path, String[] partitions, int maxPartitions,
   String baseNameTemplate) throws Exception {
       ArrowVectorIterator i = null;
       try {
           i = JdbcToArrow.sqlToArrowVectorIterator(rs, allocator);
           ArrowSqlWriterFacade reader = new ArrowSqlWriterFacade(i,
   allocator);
           DatasetFileWriter.write(allocator, reader, fileFormat, "file://" +
   path, partitions, maxPartitions, baseNameTemplate);
       } finally {
           if (i != null) {
               i.close();
           }
       }
   }
   
   public class ArrowSqlWriterFacade extends ArrowReader {
   
       private ArrowVectorIterator  i;
       private VectorSchemaRoot root;
   
       public ArrowSqlWriterFacade (ArrowVectorIterator  i, BufferAllocator
   allocator) {
           super(allocator);
           this.i = i;
       }
   
       public ArrowSqlWriterFacade (ArrowVectorIterator  i,  BufferAllocator
   allocator, CompressionCodec.Factory factory) {
           super(allocator, factory);
           this.i = i;
       }
   
       private boolean first = true;
   
       @Override
       public boolean loadNextBatch() throws IOException {
           if (first) {
               first = false;
               return true;
           } else {
               boolean loadNext = i.hasNext();
               if (loadNext) {
                   root = i.next();
               }
               return loadNext;
           }
       }
   
       @Override
       private VectorSchemaRoot getVectorSchemaRoot() {
           if (root == null) {
               root = i.next();
           }
           return root;
       }
   
       @Override
       public long bytesRead() {
             return 0;
       }
   
       @Override
       public void closeReadSource() throws IOException {
   
       }
   
       @Override
       protected Schema readSchema() throws IOException {
             VectorSchemaRoot root = getVectorSchemaRoot();
              return root.getSchema();
       }
   
   }
   
   On Mon, Jul 24, 2023 at 10:10 AM david dali susanibar arce <
   ***@***.***> wrote:
   
   > Hi @pronzato <https://github.com/pronzato> 13.0.0 version is on [VOTE]
   > Release Apache Arrow 13.0.0 - RC0. Let push it into the next version 
14.0.0.
   >
   > I managed to get it working with your guidance - looks
   > great.
   >
   > @pronzato <https://github.com/pronzato> Could be possible to share your
   > code to load them into a recipe also?
   > ------------------------------
   >
   > I am working to create a recipe to support also using JDBC Java module to
   > try to read MySQL data, and then load it into ArrowReader for persistence
   > into a Parquet file.
   >
   > *Parquet files are being created without issues*, but I am encountering
   > errors such as:
   >
   >    - Closed with outstanding buffers allocated ...
   >    - RefCnt has gone negative ...
   >
   > import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;import 
org.apache.arrow.adapter.jdbc.JdbcFieldInfo;import 
org.apache.arrow.adapter.jdbc.JdbcToArrow;import 
org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;import 
org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;import 
org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;import 
org.apache.arrow.dataset.file.DatasetFileWriter;import 
org.apache.arrow.dataset.file.FileFormat;import 
org.apache.arrow.memory.BufferAllocator;import 
org.apache.arrow.memory.RootAllocator;import 
org.apache.arrow.vector.VectorLoader;import 
org.apache.arrow.vector.VectorSchemaRoot;import 
org.apache.arrow.vector.VectorUnloader;import 
org.apache.arrow.vector.ipc.ArrowReader;import 
org.apache.arrow.vector.ipc.message.ArrowRecordBatch;import 
org.apache.arrow.vector.types.pojo.Schema;import 
org.apache.ibatis.jdbc.ScriptRunner;
   > ​import java.io.BufferedReader;import java.io.FileReader;import 
java.io.IOException;import java.sql.Connection;import 
java.sql.DriverManager;import java.sql.ResultSet;import 
java.sql.SQLException;import java.sql.Types;import java.util.HashMap;
   > ​public class WriteArrowObjectsToParquet {
   >     public static void main(String[] args) throws Exception {
   >         String uri = 
"file:///Users/dsusanibar/ddsastorage/localarrowrepository/jarfromgithub/tmp/tocookbooks/src/main/resources/write/test3";
   >         try (
   >             final BufferAllocator allocator = new RootAllocator();
   >             final BufferAllocator allocatorJDBC = 
allocator.newChildAllocator("allocatorJDBC", 0, Long.MAX_VALUE);
   >             final BufferAllocator allocatorReader = 
allocator.newChildAllocator("allocatorReader", 0, Long.MAX_VALUE);
   >             final BufferAllocator allocatorParquetWrite = 
allocator.newChildAllocator("allocatorParquetWrite", 0, Long.MAX_VALUE);
   >             final Connection connection = DriverManager.getConnection(
   >                     "jdbc:h2:mem:h2-jdbc-adapter")
   >         ) {
   >             ScriptRunner runnerDDLDML = new ScriptRunner(connection);
   >             runnerDDLDML.setLogWriter(null);
   >             runnerDDLDML.runScript(new BufferedReader(
   >                     new FileReader("./src/main/resources/h2-ddl.sql")));
   >             runnerDDLDML.runScript(new BufferedReader(
   >                     new FileReader("./src/main/resources/h2-dml.sql")));
   >             JdbcToArrowConfig config = new 
JdbcToArrowConfigBuilder(allocatorJDBC,
   >                     JdbcToArrowUtils.getUtcCalendar())
   >                     .setTargetBatchSize(2)
   >                     .setArraySubTypeByColumnNameMap(
   >                             new HashMap() {{
   >                                 put("LIST_FIELD19",
   >                                         new JdbcFieldInfo(Types.INTEGER));
   >                             }}
   >                     )
   >                     .build();
   >             String query = "SELECT int_field1, bool_field2, bigint_field5, 
char_field16, list_field19 FROM TABLE1";
   >             try (
   >                 final ResultSet resultSetConvertToParquet = 
connection.createStatement().executeQuery(query);
   >                 final ResultSet resultSetForSchema = 
connection.createStatement().executeQuery(query);
   >                 final ArrowVectorIterator arrowVectorIterator = 
JdbcToArrow.sqlToArrowVectorIterator(
   >                         resultSetConvertToParquet, config);
   >             ) {
   >                 Schema schema = 
JdbcToArrow.sqlToArrowVectorIterator(resultSetForSchema, 
config).next().getSchema();
   >                 try (
   >                     final JDBCReader arrowReader = new 
JDBCReader(allocatorReader, arrowVectorIterator, schema)
   >                 ) {
   >                     DatasetFileWriter.write(allocatorParquetWrite, 
arrowReader, FileFormat.PARQUET, uri);
   >                 }
   >             }
   >             runnerDDLDML.closeConnection();
   >         } catch (SQLException | IOException e) {
   >             e.printStackTrace();
   >         }
   >     }
   > }
   > ​class JDBCReader extends ArrowReader {
   >     private final ArrowVectorIterator iter;
   >     private final Schema schema;
   >     public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter, 
Schema schema) {
   >         super(allocator);
   >         this.iter = iter;
   >         this.schema = schema;
   >     }
   > ​
   >     @Override
   >     public boolean loadNextBatch() throws IOException {
   >         while (iter.hasNext()) {
   >             try (VectorSchemaRoot rootTmp = iter.next()) {
   >                 if ( rootTmp.getRowCount() > 0 ) {
   >                     VectorUnloader unloader = new VectorUnloader(rootTmp);
   >                     VectorLoader loader = new 
VectorLoader(super.getVectorSchemaRoot());
   >                     try (ArrowRecordBatch recordBatch = 
unloader.getRecordBatch()) {
   >                         loader.load(recordBatch);
   >                     }
   >                     return true;
   >                 } else {
   >                     return false;
   >                 }
   >             }
   >         }
   >         return false;
   >     }
   > ​
   >     @Override
   >     public long bytesRead() {
   >         return 0;
   >     }
   > ​
   >     @Override
   >     protected void closeReadSource() throws IOException {
   >     }
   > ​
   >     @Override
   >     protected Schema readSchema() {
   >         return schema;
   >     }
   > }
   >
   > Error messages:
   >
   > Exception in thread "Thread-8" java.lang.IllegalStateException: RefCnt has 
gone negative
   > ...
   > Suppressed: java.lang.IllegalStateException: 
Allocator[allocatorParquetWrite] closed with outstanding buffers allocated (12).
   > Allocator(allocatorParquetWrite) 0/17746/51748/9223372036854775807 
(res/actual/peak/limit)
   >   child allocators: 0
   > ...
   > Suppressed: java.lang.IllegalStateException: Allocator[allocatorJDBC] 
closed with outstanding buffers allocated (8).
   > Allocator(allocatorJDBC) 0/49760/99536/9223372036854775807 
(res/actual/peak/limit)
   >   child allocators: 0
   > ...
   >
   > —
   > Reply to this email directly, view it on GitHub
   > 
<https://github.com/apache/arrow-cookbook/issues/315#issuecomment-1647995983>,
   > or unsubscribe
   > 
<https://github.com/notifications/unsubscribe-auth/ACO7PHGJIPC46P7XEEHQUBLXRZ66VANCNFSM6AAAAAA2DO2TY4>
   > .
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to