davisusanibar commented on issue #315:
URL: https://github.com/apache/arrow-cookbook/issues/315#issuecomment-1647995983

   Hi @pronzato 13.0.0 version is on [VOTE] Release Apache Arrow 13.0.0 - RC0. 
Let push it into the next version 14.0.0.
   
   >I managed to get it working with your guidance - looks
   great.
   
   @pronzato Could be possible to share your code to load them into a recipe 
also?
   
   ---
   
   I am working to create a recipe to support also using JDBC Java module to 
try to read MySQL data, and then load it into ArrowReader for persistence into 
a Parquet file.
   
   **Parquet files are being created without issues**, but I am encountering 
errors such as:
   - Closed with outstanding buffers allocated ...
   - RefCnt has gone negative ...
   
   
   
   
   ```java
   import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
   import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
   import org.apache.arrow.adapter.jdbc.JdbcToArrow;
   import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
   import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
   import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
   import org.apache.arrow.dataset.file.DatasetFileWriter;
   import org.apache.arrow.dataset.file.FileFormat;
   import org.apache.arrow.memory.BufferAllocator;
   import org.apache.arrow.memory.RootAllocator;
   import org.apache.arrow.vector.VectorLoader;
   import org.apache.arrow.vector.VectorSchemaRoot;
   import org.apache.arrow.vector.VectorUnloader;
   import org.apache.arrow.vector.ipc.ArrowReader;
   import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
   import org.apache.arrow.vector.types.pojo.Schema;
   import org.apache.ibatis.jdbc.ScriptRunner;
   ​
   import java.io.BufferedReader;
   import java.io.FileReader;
   import java.io.IOException;
   import java.sql.Connection;
   import java.sql.DriverManager;
   import java.sql.ResultSet;
   import java.sql.SQLException;
   import java.sql.Types;
   import java.util.HashMap;
   ​
   public class WriteArrowObjectsToParquet {
       public static void main(String[] args) throws Exception {
           String uri = 
"file:///Users/dsusanibar/ddsastorage/localarrowrepository/jarfromgithub/tmp/tocookbooks/src/main/resources/write/test3";
           try (
               final BufferAllocator allocator = new RootAllocator();
               final BufferAllocator allocatorJDBC = 
allocator.newChildAllocator("allocatorJDBC", 0, Long.MAX_VALUE);
               final BufferAllocator allocatorReader = 
allocator.newChildAllocator("allocatorReader", 0, Long.MAX_VALUE);
               final BufferAllocator allocatorParquetWrite = 
allocator.newChildAllocator("allocatorParquetWrite", 0, Long.MAX_VALUE);
               final Connection connection = DriverManager.getConnection(
                       "jdbc:h2:mem:h2-jdbc-adapter")
           ) {
               ScriptRunner runnerDDLDML = new ScriptRunner(connection);
               runnerDDLDML.setLogWriter(null);
               runnerDDLDML.runScript(new BufferedReader(
                       new FileReader("./src/main/resources/h2-ddl.sql")));
               runnerDDLDML.runScript(new BufferedReader(
                       new FileReader("./src/main/resources/h2-dml.sql")));
               JdbcToArrowConfig config = new 
JdbcToArrowConfigBuilder(allocatorJDBC,
                       JdbcToArrowUtils.getUtcCalendar())
                       .setTargetBatchSize(2)
                       .setArraySubTypeByColumnNameMap(
                               new HashMap() {{
                                   put("LIST_FIELD19",
                                           new JdbcFieldInfo(Types.INTEGER));
                               }}
                       )
                       .build();
               String query = "SELECT int_field1, bool_field2, bigint_field5, 
char_field16, list_field19 FROM TABLE1";
               try (
                   final ResultSet resultSetConvertToParquet = 
connection.createStatement().executeQuery(query);
                   final ResultSet resultSetForSchema = 
connection.createStatement().executeQuery(query);
                   final ArrowVectorIterator arrowVectorIterator = 
JdbcToArrow.sqlToArrowVectorIterator(
                           resultSetConvertToParquet, config);
               ) {
                   Schema schema = 
JdbcToArrow.sqlToArrowVectorIterator(resultSetForSchema, 
config).next().getSchema();
                   try (
                       final JDBCReader arrowReader = new 
JDBCReader(allocatorReader, arrowVectorIterator, schema)
                   ) {
                       DatasetFileWriter.write(allocatorParquetWrite, 
arrowReader, FileFormat.PARQUET, uri);
                   }
               }
               runnerDDLDML.closeConnection();
           } catch (SQLException | IOException e) {
               e.printStackTrace();
           }
       }
   }
   ​
   class JDBCReader extends ArrowReader {
       private final ArrowVectorIterator iter;
       private final Schema schema;
       public JDBCReader(BufferAllocator allocator, ArrowVectorIterator iter, 
Schema schema) {
           super(allocator);
           this.iter = iter;
           this.schema = schema;
       }
   ​
       @Override
       public boolean loadNextBatch() throws IOException {
           while (iter.hasNext()) {
               try (VectorSchemaRoot rootTmp = iter.next()) {
                   if ( rootTmp.getRowCount() > 0 ) {
                       VectorUnloader unloader = new VectorUnloader(rootTmp);
                       VectorLoader loader = new 
VectorLoader(super.getVectorSchemaRoot());
                       try (ArrowRecordBatch recordBatch = 
unloader.getRecordBatch()) {
                           loader.load(recordBatch);
                       }
                       return true;
                   } else {
                       return false;
                   }
               }
           }
           return false;
       }
   ​
       @Override
       public long bytesRead() {
           return 0;
       }
   ​
       @Override
       protected void closeReadSource() throws IOException {
       }
   ​
       @Override
       protected Schema readSchema() {
           return schema;
       }
   }
   ```
   
   Error messages:
   
   ```
   Exception in thread "Thread-8" java.lang.IllegalStateException: RefCnt has 
gone negative
   ...
   Suppressed: java.lang.IllegalStateException: 
Allocator[allocatorParquetWrite] closed with outstanding buffers allocated (12).
   Allocator(allocatorParquetWrite) 0/17746/51748/9223372036854775807 
(res/actual/peak/limit)
     child allocators: 0
   ...
   Suppressed: java.lang.IllegalStateException: Allocator[allocatorJDBC] closed 
with outstanding buffers allocated (8).
   Allocator(allocatorJDBC) 0/49760/99536/9223372036854775807 
(res/actual/peak/limit)
     child allocators: 0
   ...
   ```
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to