Hi Fernando.

There are two independent use cases for input_file_name():
1. A lot of data still comes in from CSV and we process to Parquet after
careful data typing rules have been applied. We then materialize the
input_file_name to a column in the Parquet to be able to trace lineage of
the data. Ideally for this use case exposing a similar
monotonically_increasing_id() function is helpful.
2. For DeltaLake (https://delta.io/), which behaves like an append only
filesystem, the MERGE operation basically works by doing a join against the
data on disk and uses the input_file_name function to determine which
target files have been impacted by the join and need to be rewritten. The
upside is you get 'transactional' type database guarantees and have the
ability to time-travel through versioned datasets - but requires some
relatively low level access to what the parquet reader is doing (filename).

Thanks
Mike

On Thu, Feb 25, 2021 at 10:09 PM Fernando Herrera <
fernando.j.herr...@gmail.com> wrote:

> I see. You are storing the file name when reading a json, csv, and parquet
> file.
>
> Just out of curiosity, how would you use the file name in spark?
> Are you using it for file statistics?
>
> On Thu, Feb 25, 2021 at 9:36 AM Mike Seddon <seddo...@gmail.com> wrote:
>
> > Hi Fernando,
> >
> > After Andrew's reply I have moved the filename metadata into the Schema
> and
> > actually changed the ScalarFunctionImplementation signature to:  Arc<dyn
> > Fn(&[ColumnarValue], SchemaRef) -> Result<ColumnarValue> + Send + Sync>;
> >
> > I have a functional (WIP) repo already:
> > https://github.com/seddonm1/arrow/compare/master...seddonm1:input-file
> >
> > I need to add some more tests (mainly ensure multipart parquet works as
> > expected) but I wanted to gather feedback on the proposal before cleaning
> > up for PR.
> >
> > Mike
> >
> > On Thu, Feb 25, 2021 at 8:30 PM Fernando Herrera <
> > fernando.j.herr...@gmail.com> wrote:
> >
> > > Hi Mike,
> > >
> > > I've been thinking how you are considering adding metadata to the
> > > RecordBatch.
> > >
> > > The struct it is now defined as
> > >
> > > pub struct RecordBatch {
> > > >     schema: SchemaRef,
> > > >     columns: Vec<Arc<Array>>,
> > > > }
> > >
> > >
> > > Are you suggesting something like this?
> > >
> > > pub struct RecordBatch {
> > > >     schema: SchemaRef,
> > > >     columns: Vec<Arc<Array>>,
> > >
> > >     metadata: RecodBatchMetadata
> > >
> > > }
> > >
> > >
> > > Because if that is the case, why not use the Schema metadata. The
> schema
> > > metadata is a hashmap that can hold any information you require
> > >
> > > for example, here im using the metadata to create a schema and then use
> > > that schema with IPC and the RecordBatch
> > >
> > > fn main() {
> > > >     let mut schema_metadata: HashMap<String, String> =
> HashMap::new();
> > >
> > >   *  schema_metadata.insert("file_name".to_string(),
> > > > "my_file.parquet".to_string());*
> > > >     let schema = Schema::new_with_metadata(
> > > >         vec![
> > > >             Field::new("index", DataType::Int32, false),
> > > >             Field::new("word", DataType::Utf8, false),
> > > >         ],
> > > >         schema_metadata,
> > > >     );
> > > >     let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
> > > >     let b = StringArray::from(vec!["one", "two", "three", "four",
> > > "five"]);
> > > >     let batch =
> > > >         RecordBatch::try_new(Arc::new(schema.clone()),
> > vec![Arc::new(a),
> > > > Arc::new(b)]).unwrap();
> > > >
> > >
> > >
> > >     let stream = TcpStream::connect("127.0.0.1:8000").unwrap();
> > > >     let mut writer = StreamWriter::try_new(stream, &schema).unwrap();
> > > >     writer.write(&batch).unwrap();
> > > >     writer.finish().unwrap();
> > > > }
> > >
> > >
> > > I dont know how the Spark implementation works, but I think it would be
> > > possible to emulate it using this metadata. Don't you think so?
> > >
> > > Fernando
> > >
> > > On Thu, Feb 25, 2021 at 4:36 AM Mike Seddon <seddo...@gmail.com>
> wrote:
> > >
> > > > Thanks Micah.
> > > >
> > > > It is actually Rust implementation that is the odd one out. Ideally
> > > adding
> > > > a metadata KeyValue to the RecordBatch plus your suggested 'reserved'
> > key
> > > > would be the best option.
> > > >
> > > > On Thu, Feb 25, 2021 at 3:26 PM Micah Kornfield <
> emkornfi...@gmail.com
> > >
> > > > wrote:
> > > >
> > > > > Thanks for looking into it. I would guess it is likely possible
> > "hoist"
> > > > > metadata from a record batch schema object to the Message but
> > > understand
> > > > if
> > > > > it isn't something you want to pursue.
> > > > >
> > > > > On Wed, Feb 24, 2021 at 8:19 PM Mike Seddon <seddo...@gmail.com>
> > > wrote:
> > > > >
> > > > >> Hi Micah,
> > > > >> Thank you for providing this information. I have reviewed the
> > > > >> documentation you provided and have a few conclusions:
> > > > >>
> > > > >> 1. RecordBatch does not have the capability to attach user defined
> > > > >> metadata (KeyValue attributes):
> > > > >>
> https://github.com/apache/arrow/blob/master/format/Message.fbs#L83
> > > > >> 2. Schema does have this capability but it would not work to pass
> > > > >> per-batch input files as the design indicates that the Schema
> object
> > > > would
> > > > >> be passed once and then a series of interleaved DictionaryBatch or
> > > > >> MessageBatch messages must meet the Schema:
> > > > >>
> > > >
> > >
> >
> https://github.com/apache/arrow/blob/master/docs/source/format/Columnar.rst#ipc-streaming-format
> > > > >>
> > > > >> In the Rust implementation each RecordBatch embeds the Schema so
> > that
> > > > >> each schema can have different metadata (like filename in this
> > case).
> > > I
> > > > >> think this will have to be implemented Rust as a memory-only
> > attribute
> > > > >> which does not get persisted unless more significant changes to
> the
> > > > >> protocol are made.
> > > > >>
> > > > >> Thanks
> > > > >> Mike
> > > > >>
> > > > >> On Thu, Feb 25, 2021 at 11:14 AM Micah Kornfield <
> > > emkornfi...@gmail.com
> > > > >
> > > > >> wrote:
> > > > >>
> > > > >>> The process would be to create a PR proposal to update to the
> > custom
> > > > >>> metadata specification [1] to reserve a new word and describe its
> > > use.
> > > > >>> Then send a [DISCUSS] email on this list.  Once there is
> consensus
> > we
> > > > can
> > > > >>> formally vote and merge the change.
> > > > >>>
> > > > >>> [1]
> > > > >>>
> > > > >>>
> > > >
> > >
> >
> https://github.com/apache/arrow/blob/master/docs/source/format/Columnar.rst
> > > > >>>
> > > > >>> On Wed, Feb 24, 2021 at 3:47 PM Mike Seddon <seddo...@gmail.com>
> > > > wrote:
> > > > >>>
> > > > >>> > Thanks for both of your comments.
> > > > >>> >
> > > > >>> > @Andrew Schema.metadata does look like a logical place to house
> > the
> > > > >>> > information so that would solve part of the problem. Do you
> have
> > > any
> > > > >>> > thoughts on whether we change the function signature:
> > > > >>> >
> > > > >>> > From: <Arc<dyn Fn(&[ColumnarValue]) -> Result<ColumnarValue> +
> > > Send +
> > > > >>> > Sync>;
> > > > >>> > To:   <Arc<dyn Fn(&[ColumnarValue], RecordBatchMetadata) ->
> > > > >>> > Result<ColumnarValue> + Send + Sync>;
> > > > >>> >
> > > > >>> > @Micah It would be nice to have a reserved metadata key so this
> > > could
> > > > >>> be
> > > > >>> > shared but I am not sure of the admin process for the Arrow
> > project
> > > > to
> > > > >>> > agree something like that. Is there a forum?
> > > > >>> >
> > > > >>> > On Thu, Feb 25, 2021 at 8:58 AM Micah Kornfield <
> > > > emkornfi...@gmail.com
> > > > >>> >
> > > > >>> > wrote:
> > > > >>> >
> > > > >>> > > At least C++ (and the IPC format) a schema can be shared
> across
> > > the
> > > > >>> many
> > > > >>> > > RecordBatch's which might have different sources.
> > > > >>> > >
> > > > >>> > >  It might be useful to define a reserved metadata key
> (similar
> > to
> > > > >>> > > extension types) so that the data can be interpreted
> > > consistently.
> > > > >>> > >
> > > > >>> > > On Wed, Feb 24, 2021 at 11:29 AM Andrew Lamb <
> > > al...@influxdata.com
> > > > >
> > > > >>> > wrote:
> > > > >>> > >
> > > > >>> > > > I wonder if you could add the file_name as metadata on the
> > > > >>> `Schema` of
> > > > >>> > > the
> > > > >>> > > > RecordBatch rather than the RecordBatch itself? Since every
> > > > >>> RecordBatch
> > > > >>> > > has
> > > > >>> > > > a schema, I don't fully understand the need to add
> something
> > > > >>> additional
> > > > >>> > > to
> > > > >>> > > > the RecordBatch
> > > > >>> > > >
> > > > >>> > > >
> > > > >>> > > >
> > > > >>> > >
> > > > >>> >
> > > > >>>
> > > >
> > >
> >
> https://docs.rs/arrow/3.0.0/arrow/datatypes/struct.Schema.html#method.new_with_metadata
> > > > >>> > > >
> > > > >>> > > > On Wed, Feb 24, 2021 at 1:20 AM Mike Seddon <
> > > seddo...@gmail.com>
> > > > >>> > wrote:
> > > > >>> > > >
> > > > >>> > > > > Hi,
> > > > >>> > > > >
> > > > >>> > > > > One of Apache Spark's very useful SQL functions is the
> > > > >>> > > 'input_file_name'
> > > > >>> > > > > SQL function which provides a simple API for identifying
> > the
> > > > >>> source
> > > > >>> > of
> > > > >>> > > a
> > > > >>> > > > > row of data when sourced from a file-based source like
> > > Parquet
> > > > or
> > > > >>> > CSV.
> > > > >>> > > > This
> > > > >>> > > > > is particularly useful for identifying which
> > chunk/partition
> > > > of a
> > > > >>> > > Parquet
> > > > >>> > > > > the row came from and is used heavily by the DeltaLake
> > format
> > > > to
> > > > >>> > > > determine
> > > > >>> > > > > which files are impacted for MERGE operations.
> > > > >>> > > > >
> > > > >>> > > > > I have built a functional proof-of-concept for DataFusion
> > but
> > > > it
> > > > >>> > > requires
> > > > >>> > > > > modifying the RecordBatch struct to include a 'metadata'
> > > struct
> > > > >>> > > > > (RecordBatchMetadata) to carry the source file name
> > attached
> > > to
> > > > >>> each
> > > > >>> > > > batch.
> > > > >>> > > > >
> > > > >>> > > > > It also requires changing the
> ScalarFunctionImplementation
> > > > >>> signature
> > > > >>> > to
> > > > >>> > > > > support exposing the metadata (and therefore all the
> > > > functions).
> > > > >>> > > > >
> > > > >>> > > > > From: <Arc<dyn Fn(&[ColumnarValue]) ->
> > Result<ColumnarValue>
> > > +
> > > > >>> Send +
> > > > >>> > > > > Sync>;
> > > > >>> > > > > To:   <Arc<dyn Fn(&[ColumnarValue], RecordBatchMetadata)
> ->
> > > > >>> > > > > Result<ColumnarValue> + Send + Sync>;
> > > > >>> > > > >
> > > > >>> > > > > These changes have been made in a personal feature branch
> > and
> > > > are
> > > > >>> > > > available
> > > > >>> > > > > for review (still needs cleaning) but conceptually does
> > > anyone
> > > > >>> have a
> > > > >>> > > > > problem with this API change or have a better proposal?
> > > > >>> > > > >
> > > > >>> > > > > Thanks
> > > > >>> > > > > Mike
> > > > >>> > > > >
> > > > >>> > > >
> > > > >>> > >
> > > > >>> >
> > > > >>>
> > > > >>
> > > >
> > >
> >
>

Reply via email to