[
https://issues.apache.org/jira/browse/ARROW-16822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555267#comment-17555267
]
Weston Pace commented on ARROW-16822:
-------------------------------------
Can you raise a different error other than SystemExit or provide a traceback?
This is a rather large snippet of code to parse through to figure out which
line might be failing. Also, the exit code you are mentioning (139) does not
seem like something that pyarrow would configure. Pyarrow doesn't really
interact with exit codes.
> Python Error: <>, exitCode: <139> when csv file converting parquet using
> pandas/pyarrow libraries
> -------------------------------------------------------------------------------------------------
>
> Key: ARROW-16822
> URL: https://issues.apache.org/jira/browse/ARROW-16822
> Project: Apache Arrow
> Issue Type: Bug
> Components: Parquet, Python
> Affects Versions: 5.0.0
> Reporter: Mahesha Subrahamanya
> Priority: Blocker
>
> Our main requirement is to read source file (structured/semi structured
> /unstructured) which are residing in AWS s3 through AWS redshift database,
> where our customer have direct access to analyze the data very
> quickly/seamlessly for reporting purpose without defining the schema info for
> the file.
> We have created an data lake (aws s3) workspace where our customers dumps
> csv/parquet huge size files (like 10/15 GB). We have developed a framework
> which is consuming pandas/pyarrow (parquet) libraries to read source files in
> chunking manner and identifying schema meaning (datatype/length) and push it
> to AWS Glue where AWS redshift database can talk seamlessly to s3 files can
> read very quickly.
>
> Following is the snippet of parquet conversion where i'm getting this error.
> Please take a look
>
> read_csv_args = \{'filepath_or_buffer': src_object, 'chunksize':
> self.chunkSizeLimit, 'encoding': 'UTF-8','on_bad_lines': 'error','sep':
> fileDelimiter, 'low_memory': False, 'skip_blank_lines': True, 'memory_map':
> True} # 'verbose': True , In order to enable memory consumption logging
>
> if srcPath.endswith('.gz'):
> read_csv_args['compression'] = 'gzip'
> if fileTextQualifier:
> read_csv_args['quotechar'] = fileTextQualifier
> with pd.read_csv(**read_csv_args) as reader:
> for chunk_number, chunk in enumerate(reader, 1):
> # To support shape-shifting for the incoming datafiles,
> need to make sure match file with number of columns if not delete
> if glueMasterSchema is not None:
> sessionSchema=copy.deepcopy(glueMasterSchema)
> #copying using deepcopy() method
> chunk.columns = chunk.columns.str.lower() # modifying
> the column header of all columns to lowercase
> fileSchema = list(chunk.columns)
> for key in list(sessionSchema):
> if key not in fileSchema:
> del sessionSchema[key]
> fields = []
> for col,dtypes in sessionSchema.items():
> fields.append(pa.field(col, dtypes))
> glue_schema = pa.schema(fields)
> # To identify the boolean datatype and convert back
> to STRING which was done during the BF schema
> for cols in chunk.columns:
> try:
> if chunk[cols].dtype =='bool':
> chunk[cols] = chunk[cols].astype('str')
> if chunk[cols].dtype =='object':
> chunk[cols] =
> chunk[cols].fillna('').astype('str').tolist()
> except (ParserError,ValueError,TypeError):
> pass
> log.debug("chunk count", chunk_number, "chunk length",
> len(chunk), 'glue_schema', glue_schema, 'Wrote file', targetKey)
> #log.debug("during pandas chunk data ", chunk,"df
> schemas:", chunk.dtypes)
> table = pa.Table.from_pandas(chunk, schema=glue_schema ,
> preserve_index=False)
> log.info('Glue schema:',glue_schema,'for a
> [file:',targetKey|file:///',targetKey])
> log.info('pandas memory utilization during chunk process:
> ', chunk.memory_usage().sum(), 'Bytes.','\n\n\n')
> # Guess the schema of the CSV file from the first chunk
> #if pq_writer is None:
> if chunk_number == 1:
> #parquet_schema = table.schema
> # Open a Parquet file for writing
> pq_writer = pq.ParquetWriter(targetKey,
> schema=glue_schema, compression='snappy') # In PyArrow we use, Snappy
> generally results in better performance
> log.debug("table schema :",
> pprint.pformat(table.schema).replace('\n', ',').replace('\r', ','),' for:',
> inputFileName)
> # writing the log information into s3://etl_activity
> etlActivityLog.append({'tableObjectName':
> targetDirectory[:-1], 'sourceFileName': inputFileName, 'targetFileName':
> parquetFileName, 'message': 'File Converted Successfully', 'number of rows
> processed': str(table.num_rows), 'fileStatus': 'SUCCESS'})
> logInfo = self.read_logInfo(etlActivityLog)
> self.s3Handle.putObject(s3Client,
> 'etl_process_all.json', logInfo, bucketName, self.etlJobActivityLogFolder )
> # Write CSV chunk to the parquet file
> pq_writer.write_table(table)
> i += 1
> log.info( 'chunk count:', i, 'for a given
> [file:',targetKey,'whitelist:',targetDirectory[:-1|file:///',targetKey,'whitelist:',targetDirectory[:-1]])
> # Close a Parquet file writer
> if pq_writer is not None and pq_writer.is_open:
> pq_writer.close()
> pq_writer = None
> s3key = outputDirectory + targetDirectory + parquetFileName
> self.s3Handle.waitForFile(s3Client, bucketName, s3key)
> log.info('Metadata info:', table.column_names, 'number of
> columns:', table.num_columns, 'number of rows:', table.num_rows, 'Glue Object
> Name:', targetDirectory[:-1])
> log.debug('Wrote file', targetKey, 'with chunk count:',
> chunk_number)
> log.debug('Stream copy', targetKey, 'to parquet took:',
> datetime.now() - start_time)
> log.info('Final parquert convert:',sys.exc_info())
> except (EOFError, IOError) as x:
> log.error("error in source file for EOFError, IOError" % x)
> raise SystemExit('convert2Parquet EOFError:'+sys.exc_info())
> except (ValueError, ParserError) as x:
> log.error("error in source for ValueError, ParserError" % x)
> raise SystemExit('convert2Parquet valueError:'+sys.exc_info())
>
> finally:
> if pq_writer is not None and pq_writer.is_open:
> pq_writer.close()
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)