You also don't need to use year, month, and day. You can just use day. The time-based partition functions all produce ordinals, not local values: month(Jan 1970) = 0 and month(Jan 1972) = 24. Same thing with day and hour. In fact, I should open a PR to throw an exception when there are duplicate partition functions...
On Thu, May 2, 2019 at 1:52 PM Gautam <[email protected]> wrote: > FYI .. The test Partition Spec is : > [ > YEAR: identity(21) > MONTH: identity(22) > DAY: identity(23) > batchId: identity(24) > ] > > > > On Thu, May 2, 2019 at 1:46 PM Gautam <[email protected]> wrote: > >> > Using those, you should be able to control parallelism. If you want to >> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t >> compact manifests. >> >> This is helpful. Thanks for the pointer on increasing parallelism. Will >> try this out. So I understand the behaviour, if a different dataset has >> >=5000 batches then the resultant # manifests would be (total_num_batches >> % 5000 ) ? >> >> > What surprises me is that you’re not getting much benefit from >> filtering out manifests that aren’t helpful. We see a lot of benefit from >> it. >> >> Pardon the verbose example but i think it'l help explain what i'm seeing >> .. >> >> Regarding manifest filtering: I tested if partition filters in sql query >> actually reduce manifests being inspected. In my example, i have 16 >> manifests that point to 4000 batch partitions ( each file is restricted to >> one partition as we'r using physical partitioning in the table ). So when >> querying for .. WHERE batchId = 'xyz' .. at most 1 manifest should be >> read coz 1 batch == 1 file which should be tracked by 1 manifest (among the >> 16) , right? But i see that all 16 are being inspected in >> BaseTableScan.planFiles(). Correct me if i'm wrong, it's this call [1] >> that should be giving me the manifests that match a partition. When I >> inspect this it says `matchingManifests = 16` , which is all the >> manifests in the table. This *could* be due to the fact that our batch >> ids are random UUIDs so lower/upper bounds may not help coz there's no >> inherent ordering amongst batches. >> But then i tried year = 2019 and month = 01 and day = 01 which also >> scanned all manifests. Could this be due to the way Iceberg manifests are >> re-grouped and merged? If so, shouldn't re-grouping honour partition >> boundaries and optimize for it? >> >> >> Cheers, >> -Gautam. >> >> [1] - >> https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173 >> >> >> On Thu, May 2, 2019 at 12:27 PM Ryan Blue <[email protected]> wrote: >> >>> Good questions. Grouping manifests is configurable at the table level. >>> There are 2 settings: >>> >>> - commit.manifest.target-size-bytes defaults to 8MB, this is the >>> target size that Iceberg will compact to >>> - commit.manifest.min-count-to-merge defaults to 100, this is the >>> minimum number of files before a compaction is triggered >>> >>> Using those, you should be able to control parallelism. If you want to >>> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t >>> compact manifests. >>> >>> What surprises me is that you’re not getting much benefit from filtering >>> out manifests that aren’t helpful. We see a lot of benefit from it. You >>> might try sorting the data files by partition before adding them to the >>> table. That will cluster data files in the same partition so you can read >>> fewer manifests. >>> >>> On Thu, May 2, 2019 at 12:09 PM Gautam <[email protected]> wrote: >>> >>>> Hey Anton, >>>> Sorry bout the delay on this. Been caught up with some >>>> other things. Thanks for raising issue#173 . >>>> >>>> So the root cause is indeed the density and size of the schema. While I >>>> agree the option to configure stats for columns is good (although i'm not >>>> fully convinced that this is purely due to lower/upper bounds). For >>>> instance, maybe it's just taking a while to iterate over manifest rows and >>>> deserialize the DataFile stats in each read? The solution i'm using right >>>> now is to parallelize the manifest reading in split planning. We >>>> regenerated the Iceberg table with more manifests. Now the code enables the >>>> ParallelIterator which uses a worker pool of threads (1 thread per cpu by >>>> default, configurable using 'iceberg.worker.num-threads' ) to read >>>> manifests. >>>> >>>> On that note, the ability to parallelize is limited to how many >>>> manifests are in the table. So as a test, for a table with 4000 files we >>>> created one manifest per file (think of one file as a single batch commit >>>> in this case). So I was hoping to get a parallelism factor of 4000. But >>>> Iceberg summarizes manifests into fewer manifests with each commit so we >>>> instead ended up with 16 manifests. So now split planning is limited to >>>> reading at most 16 units of parallelism. Is this grouping of manifests into >>>> fewer configurable? if not should we allow making this configurable? >>>> >>>> Sorry if this is forking a different conversation. If so, I can start a >>>> separate conversation thread on this. >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <[email protected]> >>>> wrote: >>>> >>>>> Hey Gautam, >>>>> >>>>> Out of my curiosity, did you manage to confirm the root cause of the >>>>> issue? >>>>> >>>>> P.S. I created [1] so that we can make collection of lower/upper >>>>> bounds configurable. >>>>> >>>>> Thanks, >>>>> Anton >>>>> >>>>> [1] - https://github.com/apache/incubator-iceberg/issues/173 >>>>> >>>>> On 22 Apr 2019, at 09:15, Gautam <[email protected]> wrote: >>>>> >>>>> Thanks guys for the insights .. >>>>> >>>>> > I like Anton's idea to have an optional list of columns for which we >>>>> keep stats. That would allow us to avoid storing stats for thousands of >>>>> columns that won't ever be used. Another option here is to add a flag to >>>>> keep stats only for top-level columns. That's much less configuration for >>>>> users and probably does the right thing in many cases. Simpler to use but >>>>> not as fast in all cases is sometimes a good compromise. >>>>> >>>>> This makes sense to me. It adds a variable that data pipelines can >>>>> tweak on to improve performance. I will add an issue on Github to add a >>>>> stats config/flag. Although, having said that, I would try to optimize >>>>> around this coz read patterns are hardly ever known a priori and adding a >>>>> column to this list means having to re-write the entire data again. So i'l >>>>> try the other suggestion which is parallelizing on multiple manifests. >>>>> >>>>> > To clarify my comment on changing the storage: the idea is to use >>>>> separate columns instead of a map and then use a columnar storage format >>>>> so >>>>> we can project those columns independently. Avro can't project columns >>>>> independently. This wouldn't help on the write side and may just cause a >>>>> lot of seeking on the read side that diminishes the benefits. >>>>> >>>>> Gotcha. >>>>> >>>>> > Also, now that we have more details, I think there is a second >>>>> problem. Because we expect several manifests in a table, we parallelize >>>>> split planning on manifests instead of splits of manifest files. This >>>>> planning operation is happening in a single thread instead of in parallel. >>>>> I think if you split the write across several manifests, you'd improve >>>>> wall >>>>> time. >>>>> >>>>> This might actually be the issue here, this was a test bench dataset >>>>> so the writer job created a single manifest for all the data in the >>>>> dataset >>>>> which isn't really how we will do things in prod. I'l try and create the >>>>> metadata based on productions expected commit pattern. >>>>> >>>>> >>>>> Regarding Iceberg not truncating large bounded column values >>>>> https://github.com/apache/incubator-iceberg/issues/113 .. I didn't >>>>> consider this with our dataset. The current evidence is leading towards >>>>> the >>>>> number of columns and the sheer number of files that the manifest is >>>>> maintaining but this is a good thing to look into. >>>>> >>>>> Thanks again guys. >>>>> >>>>> -Gautam. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <[email protected]> wrote: >>>>> >>>>>> I like Anton's idea to have an optional list of columns for which we >>>>>> keep stats. That would allow us to avoid storing stats for thousands of >>>>>> columns that won't ever be used. Another option here is to add a flag to >>>>>> keep stats only for top-level columns. That's much less configuration for >>>>>> users and probably does the right thing in many cases. Simpler to use but >>>>>> not as fast in all cases is sometimes a good compromise. >>>>>> >>>>>> To clarify my comment on changing the storage: the idea is to use >>>>>> separate columns instead of a map and then use a columnar storage format >>>>>> so >>>>>> we can project those columns independently. Avro can't project columns >>>>>> independently. This wouldn't help on the write side and may just cause a >>>>>> lot of seeking on the read side that diminishes the benefits. >>>>>> >>>>>> Also, now that we have more details, I think there is a second >>>>>> problem. Because we expect several manifests in a table, we parallelize >>>>>> split planning on manifests instead of splits of manifest files. This >>>>>> planning operation is happening in a single thread instead of in >>>>>> parallel. >>>>>> I think if you split the write across several manifests, you'd improve >>>>>> wall >>>>>> time. >>>>>> >>>>>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> No, we haven’t experienced it yet. The manifest size is huge in your >>>>>>> case. To me, Ryan is correct: it might be either big lower/upper bounds >>>>>>> (then truncation will help) or a big number columns (then collecting >>>>>>> lower/upper bounds only for specific columns will help). I think both >>>>>>> optimizations are needed and will reduce the manifest size. >>>>>>> >>>>>>> Since you mentioned you have a lot of columns and we collect bounds >>>>>>> for nested struct fields, I am wondering if you could revert [1] locally >>>>>>> and compare the manifest size. >>>>>>> >>>>>>> [1] - >>>>>>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f >>>>>>> >>>>>>> On 19 Apr 2019, at 15:42, Gautam <[email protected]> wrote: >>>>>>> >>>>>>> Thanks for responding Anton! Do we think the delay is mainly due to >>>>>>> lower/upper bound filtering? have you faced this? I haven't exactly >>>>>>> found >>>>>>> where the slowness is yet. It's generally due to the stats filtering but >>>>>>> what part of it is causing this much network traffic. There's >>>>>>> CloseableIteratable that takes a ton of time on the next() and >>>>>>> hasNext() >>>>>>> calls. My guess is the expression evaluation on each manifest entry is >>>>>>> what's doing it. >>>>>>> >>>>>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> I think we need to have a list of columns for which we want to >>>>>>>> collect stats and that should be configurable by the user. Maybe, this >>>>>>>> config should be applicable only to lower/upper bounds. As we now >>>>>>>> collect >>>>>>>> stats even for nested struct fields, this might generate a lot of >>>>>>>> data. In >>>>>>>> most cases, users cluster/sort their data by a subset of data columns >>>>>>>> to >>>>>>>> have fast queries with predicates on those columns. So, being able to >>>>>>>> configure columns for which to collect lower/upper bounds seems >>>>>>>> reasonable. >>>>>>>> >>>>>>>> On 19 Apr 2019, at 08:03, Gautam <[email protected]> wrote: >>>>>>>> >>>>>>>> > The length in bytes of the schema is 109M as compared to 687K of >>>>>>>> the non-stats dataset. >>>>>>>> >>>>>>>> Typo, length in bytes of *manifest*. schema is the same. >>>>>>>> >>>>>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Correction, partition count = 4308. >>>>>>>>> >>>>>>>>> > Re: Changing the way we keep stats. Avro is a block splittable >>>>>>>>> format and is friendly with parallel compute frameworks like Spark. >>>>>>>>> >>>>>>>>> Here I am trying to say that we don't need to change the format to >>>>>>>>> columnar right? The current format is already friendly for >>>>>>>>> parallelization. >>>>>>>>> >>>>>>>>> thanks. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Ah, my bad. I missed adding in the schema details .. Here are >>>>>>>>>> some details on the dataset with stats : >>>>>>>>>> >>>>>>>>>> Iceberg Schema Columns : 20 >>>>>>>>>> Spark Schema fields : 20 >>>>>>>>>> Snapshot Summary :{added-data-files=4308, >>>>>>>>>> added-records=11494037, changed-partition-count=4308, >>>>>>>>>> total-records=11494037, total-data-files=4308} >>>>>>>>>> Manifest files :1 >>>>>>>>>> Manifest details: >>>>>>>>>> => manifest file path: >>>>>>>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro >>>>>>>>>> => manifest file length: 109,028,885 >>>>>>>>>> => existing files count: 0 >>>>>>>>>> => added files count: 4308 >>>>>>>>>> => deleted files count: 0 >>>>>>>>>> => partitions count: 4 >>>>>>>>>> => partition fields count: 4 >>>>>>>>>> >>>>>>>>>> Re: Num data files. It has a single manifest keep track of 4308 >>>>>>>>>> files. Total record count is 11.4 Million. >>>>>>>>>> >>>>>>>>>> Re: Columns. You are right that this table has many columns.. >>>>>>>>>> although it has only 20 top-level columns, num leaf columns are in >>>>>>>>>> order >>>>>>>>>> of thousands. This Schema is heavy on structs (in the thousands) and >>>>>>>>>> has >>>>>>>>>> deep levels of nesting. I know Iceberg keeps >>>>>>>>>> *column_sizes, value_counts, null_value_counts* for all leaf >>>>>>>>>> fields and additionally *lower-bounds, upper-bounds* for native, >>>>>>>>>> struct types (not yet for map KVs and arrays). The length in bytes >>>>>>>>>> of the >>>>>>>>>> schema is 109M as compared to 687K of the non-stats dataset. >>>>>>>>>> >>>>>>>>>> Re: Turning off stats. I am looking to leverage stats coz for our >>>>>>>>>> datasets with much larger number of data files we want to leverage >>>>>>>>>> iceberg's ability to skip entire files based on these stats. This is >>>>>>>>>> one of >>>>>>>>>> the big incentives for us to use Iceberg. >>>>>>>>>> >>>>>>>>>> Re: Changing the way we keep stats. Avro is a block splittable >>>>>>>>>> format and is friendly with parallel compute frameworks like Spark. >>>>>>>>>> So >>>>>>>>>> would it make sense for instance to have add an option to have Spark >>>>>>>>>> job / >>>>>>>>>> Futures handle split planning? In a larger context, 109M is not >>>>>>>>>> that >>>>>>>>>> much metadata given that Iceberg is meant for datasets where the >>>>>>>>>> metadata >>>>>>>>>> itself is Bigdata scale. I'm curious on how folks with larger sized >>>>>>>>>> metadata (in GB) are optimizing this today. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> -Gautam. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks for bringing this up! My initial theory is that this >>>>>>>>>>> table has a ton of stats data that you have to read. That could >>>>>>>>>>> happen in a >>>>>>>>>>> couple of cases. >>>>>>>>>>> >>>>>>>>>>> First, you might have large values in some columns. Parquet will >>>>>>>>>>> suppress its stats if values are larger than 4k and those are what >>>>>>>>>>> Iceberg >>>>>>>>>>> uses. But that could still cause you to store two 1k+ objects for >>>>>>>>>>> each >>>>>>>>>>> large column (lower and upper bounds). With a lot of data files, >>>>>>>>>>> that could >>>>>>>>>>> add up quickly. The solution here is to implement #113 >>>>>>>>>>> <https://github.com/apache/incubator-iceberg/issues/113> so >>>>>>>>>>> that we don't store the actual min and max for string or binary >>>>>>>>>>> columns, >>>>>>>>>>> but instead a truncated value that is just above or just below. >>>>>>>>>>> >>>>>>>>>>> The second case is when you have a lot of columns. Each column >>>>>>>>>>> stores both a lower and upper bound, so 1,000 columns could easily >>>>>>>>>>> take 8k >>>>>>>>>>> per file. If this is the problem, then maybe we want to have a way >>>>>>>>>>> to turn >>>>>>>>>>> off column stats. We could also think of ways to change the way >>>>>>>>>>> stats are >>>>>>>>>>> stored in the manifest files, but that only helps if we move to a >>>>>>>>>>> columnar >>>>>>>>>>> format to store manifests, so this is probably not a short-term fix. >>>>>>>>>>> >>>>>>>>>>> If you can share a bit more information about this table, we can >>>>>>>>>>> probably tell which one is the problem. I'm guessing it is the >>>>>>>>>>> large values >>>>>>>>>>> problem. >>>>>>>>>>> >>>>>>>>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello folks, >>>>>>>>>>>> >>>>>>>>>>>> I have been testing Iceberg reading with and without stats >>>>>>>>>>>> built into Iceberg dataset manifest and found that there's a huge >>>>>>>>>>>> jump in >>>>>>>>>>>> network traffic with the latter.. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> In my test I am comparing two Iceberg datasets, both written in >>>>>>>>>>>> Iceberg format. One with and the other without stats collected in >>>>>>>>>>>> Iceberg >>>>>>>>>>>> manifests. In particular the difference between the writers used >>>>>>>>>>>> for the >>>>>>>>>>>> two datasets is this PR: >>>>>>>>>>>> https://github.com/apache/incubator-iceberg/pull/63/files which >>>>>>>>>>>> uses Iceberg's writers for writing Parquet data. I captured >>>>>>>>>>>> tcpdump from >>>>>>>>>>>> query scans run on these two datasets. The partition being scanned >>>>>>>>>>>> contains 1 manifest, 1 parquet data file and ~3700 rows in both >>>>>>>>>>>> datasets. >>>>>>>>>>>> There's a 30x jump in network traffic to the remote filesystem >>>>>>>>>>>> (ADLS) when >>>>>>>>>>>> i switch to stats based Iceberg dataset. Both queries used the >>>>>>>>>>>> same Iceberg >>>>>>>>>>>> reader code to access both datasets. >>>>>>>>>>>> >>>>>>>>>>>> ``` >>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r >>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep >>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l >>>>>>>>>>>> reading from file >>>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type >>>>>>>>>>>> EN10MB >>>>>>>>>>>> (Ethernet) >>>>>>>>>>>> >>>>>>>>>>>> *8844* >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r >>>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep >>>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l >>>>>>>>>>>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap, >>>>>>>>>>>> link-type EN10MB (Ethernet) >>>>>>>>>>>> >>>>>>>>>>>> *269708* >>>>>>>>>>>> >>>>>>>>>>>> ``` >>>>>>>>>>>> >>>>>>>>>>>> As a consequence of this the query response times get affected >>>>>>>>>>>> drastically (illustrated below). I must confess that I am on a slow >>>>>>>>>>>> internet connection via VPN connecting to the remote FS. But the >>>>>>>>>>>> dataset >>>>>>>>>>>> without stats took just 1m 49s while the dataset with stats took >>>>>>>>>>>> 26m 48s to >>>>>>>>>>>> read the same sized data. Most of that time in the latter dataset >>>>>>>>>>>> was spent >>>>>>>>>>>> split planning in Manifest reading and stats evaluation. >>>>>>>>>>>> >>>>>>>>>>>> ``` >>>>>>>>>>>> all=> select count(*) from iceberg_geo1_metrixx_qc_postvalues >>>>>>>>>>>> where batchId = '4a6f95abac924159bb3d7075373395c9'; >>>>>>>>>>>> count(1) >>>>>>>>>>>> ---------- >>>>>>>>>>>> 3627 >>>>>>>>>>>> (1 row) >>>>>>>>>>>> *Time: 109673.202 ms (01:49.673)* >>>>>>>>>>>> >>>>>>>>>>>> all=> select count(*) from iceberg_scratch_pad_demo_11 where >>>>>>>>>>>> _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId = >>>>>>>>>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15'; >>>>>>>>>>>> count(1) >>>>>>>>>>>> ---------- >>>>>>>>>>>> 3808 >>>>>>>>>>>> (1 row) >>>>>>>>>>>> *Time: 1608058.616 ms (26:48.059)* >>>>>>>>>>>> >>>>>>>>>>>> ``` >>>>>>>>>>>> >>>>>>>>>>>> Has anyone faced this? I'm wondering if there's some caching or >>>>>>>>>>>> parallelism option here that can be leveraged. Would appreciate >>>>>>>>>>>> some >>>>>>>>>>>> guidance. If there isn't a straightforward fix and others feel >>>>>>>>>>>> this is an >>>>>>>>>>>> issue I can raise an issue and look into it further. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> -Gautam. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Ryan Blue >>>>>>>>>>> Software Engineer >>>>>>>>>>> Netflix >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Ryan Blue >>>>>> Software Engineer >>>>>> Netflix >>>>>> >>>>> >>>>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >>> >> -- Ryan Blue Software Engineer Netflix
