Some answers / ideas: The typical: Write in Kafka The Fashion: Pravega (from Apache Flink) The Future: Wait to Erasure Code in HDFS 3
On Wed, 19 Dec 2018 at 16:41, Wes McKinney <wesmck...@gmail.com> wrote: > We could certainly develop some tools in C++ and/or Python to assist > with the compaction workflows. If you have an idea about how these > might look and be generally useful, please feel free to propose in a > JIRA issue > > On Wed, Dec 19, 2018 at 9:09 AM Joel Pfaff <joel.pf...@gmail.com> wrote: > > > > Unfortunately I cannot use kudu in my projects, I would have loved to > give > > it a try. I did not know about hudi, it seems very similar to what we do > > (Parquet + Avro), I will have a look. > > I am following the iceberg project very closely, because it appears to > > solve a lot of problems that we face on a regular basis. > > I am really excited to learn that the arrow and iceberg projects could > work > > together and I can hope for a lot of good things coming out of these. > > > > On Wed, Dec 19, 2018 at 2:52 PM Uwe L. Korn <uw...@xhochy.com> wrote: > > > > > This can also be solved by using a table format like > > > https://github.com/uber/hudi or > > > https://github.com/apache/incubator-iceberg where the latter has a PR > > > open for a basic Python implementation with pyarrow. > > > > > > These table formats support using Avro and Parquet seamlessly together > > > without the reader needing to take care of the storage format. > > > > > > Uwe > > > > > > > Am 19.12.2018 um 14:47 schrieb Wes McKinney <wesmck...@gmail.com>: > > > > > > > > This turns out to be a very common problem (landing incremental > > > > updates, dealing with compaction and small files). It's part of the > > > > reason that systems like Apache Kudu were developed, e.g. > > > > > > > > > > > > https://blog.cloudera.com/blog/2015/11/how-to-ingest-and-query-fast-data-with-impala-without-kudu/ > > > > > > > > If you have to use file storage, then figuring out a scheme to > compact > > > > Parquet files (e.g. once per hour, once per day) will definitely be > > > > worth it compared with using a slower file format (like Avro) > > > > > > > > - Wes > > > > > > > >> On Wed, Dec 19, 2018 at 7:37 AM Joel Pfaff <joel.pf...@gmail.com> > > > wrote: > > > >> > > > >> Hello, > > > >> > > > >> For my company's usecases, we have found that the number of files > was a > > > >> critical part of the time spent doing the execution plan, so we > found > > > the > > > >> idea of very regularly writing small parquet files to be rather > > > inefficient. > > > >> > > > >> There are some formats that support an `append` semantic (I have > tested > > > >> successfully with avro, but there are a couple others that could be > used > > > >> similarly). > > > >> So we had a few cases where we were aggregating data in a `current > > > table` > > > >> in set of avro files, and rewriting all of it in few parquet files > at > > > the > > > >> end of the day. > > > >> This allowed us to have files that have been prepared to optimize > their > > > >> querying performance (file size, row group size, sorting per > column) by > > > >> maximizing the ability to benefit from the statistics. > > > >> And our queries were doing an UNION between "optimized for speed" > > > history > > > >> tables and "optimized for latency" current tables, when the query > > > timeframe > > > >> was crossing the boundaries of the current day. > > > >> > > > >> Regards, Joel > > > >> > > > >> On Wed, Dec 19, 2018 at 2:14 PM Francois Saint-Jacques < > > > >> fsaintjacq...@networkdump.com> wrote: > > > >> > > > >>> Hello Darren, > > > >>> > > > >>> what Uwe suggests is usually the way to go, your active process > writes > > > to a > > > >>> new file every time. Then you have a parallel process/thread that > does > > > >>> compaction of smaller files in the background such that you don't > have > > > too > > > >>> many files. > > > >>> > > > >>>> On Wed, Dec 19, 2018 at 7:59 AM Uwe L. Korn <uw...@xhochy.com> > wrote: > > > >>>> > > > >>>> Hello Darren, > > > >>>> > > > >>>> you're out of luck here. Parquet files are immutable and meant for > > > batch > > > >>>> writes. Once they're written you cannot modify them anymore. To > load > > > >>> them, > > > >>>> you need to know their metadata which is in the footer. The > footer is > > > >>>> always at the end of the file and written once you call close. > > > >>>> > > > >>>> Your use case is normally fulfilled by continously starting new > files > > > and > > > >>>> reading them back in using the ParquetDataset class > > > >>>> > > > >>>> Cheers > > > >>>> Uwe > > > >>>> > > > >>>> Am 18.12.2018 um 21:03 schrieb Darren Gallagher <daz...@gmail.com > >: > > > >>>> > > > >>>>>> [Cross posted from https://github.com/apache/arrow/issues/3203] > > > >>>>>> > > > >>>>>> I'm adding new data to a parquet file every 60 seconds using > this > > > >>> code: > > > >>>>>> > > > >>>>>> import os > > > >>>>>> import json > > > >>>>>> import time > > > >>>>>> import requests > > > >>>>>> import pandas as pd > > > >>>>>> import numpy as np > > > >>>>>> import pyarrow as pa > > > >>>>>> import pyarrow.parquet as pq > > > >>>>>> > > > >>>>>> api_url = 'https://opensky-network.org/api/states/all' > > > >>>>>> > > > >>>>>> cols = ['icao24', 'callsign', 'origin', 'time_position', > > > >>>>>> 'last_contact', 'longitude', 'latitude', > > > >>>>>> 'baro_altitude', 'on_ground', 'velocity', 'true_track', > > > >>>>>> 'vertical_rate', 'sensors', 'geo_altitude', 'squawk', > > > >>>>>> 'spi', 'position_source'] > > > >>>>>> > > > >>>>>> def get_new_flight_info(writer): > > > >>>>>> print("Requesting new data") > > > >>>>>> req = requests.get(api_url) > > > >>>>>> content = req.json() > > > >>>>>> > > > >>>>>> states = content['states'] > > > >>>>>> df = pd.DataFrame(states, columns = cols) > > > >>>>>> df['timestamp'] = content['time'] > > > >>>>>> print("Found {} new items".format(len(df))) > > > >>>>>> > > > >>>>>> table = pa.Table.from_pandas(df) > > > >>>>>> if writer is None: > > > >>>>>> writer = pq.ParquetWriter('openskyflights.parquet', > > > >>> table.schema) > > > >>>>>> writer.write_table(table=table) > > > >>>>>> return writer > > > >>>>>> > > > >>>>>> if __name__ == '__main__': > > > >>>>>> writer = None > > > >>>>>> while (not os.path.exists('opensky.STOP')): > > > >>>>>> writer = get_new_flight_info(writer) > > > >>>>>> time.sleep(60) > > > >>>>>> > > > >>>>>> if writer: > > > >>>>>> writer.close() > > > >>>>>> > > > >>>>>> This is working fine and the file grows every 60 seconds. > > > >>>>>> However unless I force the loop to exit I am unable to use the > > > parquet > > > >>>>>> file. In a separate terminal I try to access the parquet file > using > > > >>> this > > > >>>>>> code: > > > >>>>>> > > > >>>>>> import pandas as pd > > > >>>>>> import pyarrow.parquet as pq > > > >>>>>> > > > >>>>>> table = pq.read_table("openskyflights.parquet") > > > >>>>>> df = table.to_pandas() > > > >>>>>> print(len(df)) > > > >>>>>> > > > >>>>>> which results in this error: > > > >>>>>> > > > >>>>>> Traceback (most recent call last): > > > >>>>>> File "checkdownloadsize.py", line 7, in <module> > > > >>>>>> table = pq.read_table("openskyflights.parquet") > > > >>>>>> File > > > >>>> > > > >>> > > > > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > > > >>>> line 1074, in read_table > > > >>>>>> use_pandas_metadata=use_pandas_metadata) > > > >>>>>> File > > > >>>> > > > >>> > > > > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/filesystem.py", > > > >>>> line 182, in read_parquet > > > >>>>>> filesystem=self) > > > >>>>>> File > > > >>>> > > > >>> > > > > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > > > >>>> line 882, in __init__ > > > >>>>>> self.validate_schemas() > > > >>>>>> File > > > >>>> > > > >>> > > > > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > > > >>>> line 895, in validate_schemas > > > >>>>>> self.schema = self.pieces[0].get_metadata(open_file).schema > > > >>>>>> File > > > >>>> > > > >>> > > > > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > > > >>>> line 453, in get_metadata > > > >>>>>> return self._open(open_file_func).metadata > > > >>>>>> File > > > >>>> > > > >>> > > > > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > > > >>>> line 459, in _open > > > >>>>>> reader = open_file_func(self.path) > > > >>>>>> File > > > >>>> > > > >>> > > > > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > > > >>>> line 984, in open_file > > > >>>>>> common_metadata=self.common_metadata) > > > >>>>>> File > > > >>>> > > > >>> > > > > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > > > >>>> line 102, in __init__ > > > >>>>>> self.reader.open(source, metadata=metadata) > > > >>>>>> File "pyarrow/_parquet.pyx", line 639, in > > > >>>> pyarrow._parquet.ParquetReader.open > > > >>>>>> File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > > > >>>>>> pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer. > > > >>>>>> > > > >>>>>> Is there a way to achieve this? > > > >>>>>> I'm assuming that if I call writer.close() in the while loop > then it > > > >>>> will > > > >>>>>> prevent any further data being written to the file? Is there > some > > > kind > > > >>>> of > > > >>>>>> "flush" operation that can be used to ensure all data is > written to > > > >>> disk > > > >>>>>> and available to other processes or threads that want to read > the > > > >>> data? > > > >>>>>> > > > >>>>>> Thanks > > > >>>>>> > > > >>>> > > > >>>> > > > >>> > > > >>> -- > > > >>> Sent from my jetpack. > > > >>> > > > >