FYI .. The test Partition Spec is : [ YEAR: identity(21) MONTH: identity(22) DAY: identity(23) batchId: identity(24) ]
On Thu, May 2, 2019 at 1:46 PM Gautam <[email protected]> wrote: > > Using those, you should be able to control parallelism. If you want to > test with 4,000, then you can set the min count to 5,000 so Iceberg won’t > compact manifests. > > This is helpful. Thanks for the pointer on increasing parallelism. Will > try this out. So I understand the behaviour, if a different dataset has > >=5000 batches then the resultant # manifests would be (total_num_batches > % 5000 ) ? > > > What surprises me is that you’re not getting much benefit from filtering > out manifests that aren’t helpful. We see a lot of benefit from it. > > Pardon the verbose example but i think it'l help explain what i'm seeing > .. > > Regarding manifest filtering: I tested if partition filters in sql query > actually reduce manifests being inspected. In my example, i have 16 > manifests that point to 4000 batch partitions ( each file is restricted to > one partition as we'r using physical partitioning in the table ). So when > querying for .. WHERE batchId = 'xyz' .. at most 1 manifest should be > read coz 1 batch == 1 file which should be tracked by 1 manifest (among the > 16) , right? But i see that all 16 are being inspected in > BaseTableScan.planFiles(). Correct me if i'm wrong, it's this call [1] > that should be giving me the manifests that match a partition. When I > inspect this it says `matchingManifests = 16` , which is all the > manifests in the table. This *could* be due to the fact that our batch > ids are random UUIDs so lower/upper bounds may not help coz there's no > inherent ordering amongst batches. > But then i tried year = 2019 and month = 01 and day = 01 which also > scanned all manifests. Could this be due to the way Iceberg manifests are > re-grouped and merged? If so, shouldn't re-grouping honour partition > boundaries and optimize for it? > > > Cheers, > -Gautam. > > [1] - > https://github.com/apache/incubator-iceberg/blob/master/core/src/main/java/org/apache/iceberg/BaseTableScan.java#L173 > > > On Thu, May 2, 2019 at 12:27 PM Ryan Blue <[email protected]> wrote: > >> Good questions. Grouping manifests is configurable at the table level. >> There are 2 settings: >> >> - commit.manifest.target-size-bytes defaults to 8MB, this is the >> target size that Iceberg will compact to >> - commit.manifest.min-count-to-merge defaults to 100, this is the >> minimum number of files before a compaction is triggered >> >> Using those, you should be able to control parallelism. If you want to >> test with 4,000, then you can set the min count to 5,000 so Iceberg won’t >> compact manifests. >> >> What surprises me is that you’re not getting much benefit from filtering >> out manifests that aren’t helpful. We see a lot of benefit from it. You >> might try sorting the data files by partition before adding them to the >> table. That will cluster data files in the same partition so you can read >> fewer manifests. >> >> On Thu, May 2, 2019 at 12:09 PM Gautam <[email protected]> wrote: >> >>> Hey Anton, >>> Sorry bout the delay on this. Been caught up with some other >>> things. Thanks for raising issue#173 . >>> >>> So the root cause is indeed the density and size of the schema. While I >>> agree the option to configure stats for columns is good (although i'm not >>> fully convinced that this is purely due to lower/upper bounds). For >>> instance, maybe it's just taking a while to iterate over manifest rows and >>> deserialize the DataFile stats in each read? The solution i'm using right >>> now is to parallelize the manifest reading in split planning. We >>> regenerated the Iceberg table with more manifests. Now the code enables the >>> ParallelIterator which uses a worker pool of threads (1 thread per cpu by >>> default, configurable using 'iceberg.worker.num-threads' ) to read >>> manifests. >>> >>> On that note, the ability to parallelize is limited to how many >>> manifests are in the table. So as a test, for a table with 4000 files we >>> created one manifest per file (think of one file as a single batch commit >>> in this case). So I was hoping to get a parallelism factor of 4000. But >>> Iceberg summarizes manifests into fewer manifests with each commit so we >>> instead ended up with 16 manifests. So now split planning is limited to >>> reading at most 16 units of parallelism. Is this grouping of manifests into >>> fewer configurable? if not should we allow making this configurable? >>> >>> Sorry if this is forking a different conversation. If so, I can start a >>> separate conversation thread on this. >>> >>> >>> >>> >>> >>> >>> On Wed, May 1, 2019 at 9:42 PM Anton Okolnychyi <[email protected]> >>> wrote: >>> >>>> Hey Gautam, >>>> >>>> Out of my curiosity, did you manage to confirm the root cause of the >>>> issue? >>>> >>>> P.S. I created [1] so that we can make collection of lower/upper bounds >>>> configurable. >>>> >>>> Thanks, >>>> Anton >>>> >>>> [1] - https://github.com/apache/incubator-iceberg/issues/173 >>>> >>>> On 22 Apr 2019, at 09:15, Gautam <[email protected]> wrote: >>>> >>>> Thanks guys for the insights .. >>>> >>>> > I like Anton's idea to have an optional list of columns for which we >>>> keep stats. That would allow us to avoid storing stats for thousands of >>>> columns that won't ever be used. Another option here is to add a flag to >>>> keep stats only for top-level columns. That's much less configuration for >>>> users and probably does the right thing in many cases. Simpler to use but >>>> not as fast in all cases is sometimes a good compromise. >>>> >>>> This makes sense to me. It adds a variable that data pipelines can >>>> tweak on to improve performance. I will add an issue on Github to add a >>>> stats config/flag. Although, having said that, I would try to optimize >>>> around this coz read patterns are hardly ever known a priori and adding a >>>> column to this list means having to re-write the entire data again. So i'l >>>> try the other suggestion which is parallelizing on multiple manifests. >>>> >>>> > To clarify my comment on changing the storage: the idea is to use >>>> separate columns instead of a map and then use a columnar storage format so >>>> we can project those columns independently. Avro can't project columns >>>> independently. This wouldn't help on the write side and may just cause a >>>> lot of seeking on the read side that diminishes the benefits. >>>> >>>> Gotcha. >>>> >>>> > Also, now that we have more details, I think there is a second >>>> problem. Because we expect several manifests in a table, we parallelize >>>> split planning on manifests instead of splits of manifest files. This >>>> planning operation is happening in a single thread instead of in parallel. >>>> I think if you split the write across several manifests, you'd improve wall >>>> time. >>>> >>>> This might actually be the issue here, this was a test bench dataset so >>>> the writer job created a single manifest for all the data in the dataset >>>> which isn't really how we will do things in prod. I'l try and create the >>>> metadata based on productions expected commit pattern. >>>> >>>> >>>> Regarding Iceberg not truncating large bounded column values >>>> https://github.com/apache/incubator-iceberg/issues/113 .. I didn't >>>> consider this with our dataset. The current evidence is leading towards the >>>> number of columns and the sheer number of files that the manifest is >>>> maintaining but this is a good thing to look into. >>>> >>>> Thanks again guys. >>>> >>>> -Gautam. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Fri, Apr 19, 2019 at 9:05 AM Ryan Blue <[email protected]> wrote: >>>> >>>>> I like Anton's idea to have an optional list of columns for which we >>>>> keep stats. That would allow us to avoid storing stats for thousands of >>>>> columns that won't ever be used. Another option here is to add a flag to >>>>> keep stats only for top-level columns. That's much less configuration for >>>>> users and probably does the right thing in many cases. Simpler to use but >>>>> not as fast in all cases is sometimes a good compromise. >>>>> >>>>> To clarify my comment on changing the storage: the idea is to use >>>>> separate columns instead of a map and then use a columnar storage format >>>>> so >>>>> we can project those columns independently. Avro can't project columns >>>>> independently. This wouldn't help on the write side and may just cause a >>>>> lot of seeking on the read side that diminishes the benefits. >>>>> >>>>> Also, now that we have more details, I think there is a second >>>>> problem. Because we expect several manifests in a table, we parallelize >>>>> split planning on manifests instead of splits of manifest files. This >>>>> planning operation is happening in a single thread instead of in parallel. >>>>> I think if you split the write across several manifests, you'd improve >>>>> wall >>>>> time. >>>>> >>>>> On Fri, Apr 19, 2019 at 8:15 AM Anton Okolnychyi < >>>>> [email protected]> wrote: >>>>> >>>>>> No, we haven’t experienced it yet. The manifest size is huge in your >>>>>> case. To me, Ryan is correct: it might be either big lower/upper bounds >>>>>> (then truncation will help) or a big number columns (then collecting >>>>>> lower/upper bounds only for specific columns will help). I think both >>>>>> optimizations are needed and will reduce the manifest size. >>>>>> >>>>>> Since you mentioned you have a lot of columns and we collect bounds >>>>>> for nested struct fields, I am wondering if you could revert [1] locally >>>>>> and compare the manifest size. >>>>>> >>>>>> [1] - >>>>>> https://github.com/apache/incubator-iceberg/commit/c383dd87a89e35d622e9c458fd711931cbc5e96f >>>>>> >>>>>> On 19 Apr 2019, at 15:42, Gautam <[email protected]> wrote: >>>>>> >>>>>> Thanks for responding Anton! Do we think the delay is mainly due to >>>>>> lower/upper bound filtering? have you faced this? I haven't exactly found >>>>>> where the slowness is yet. It's generally due to the stats filtering but >>>>>> what part of it is causing this much network traffic. There's >>>>>> CloseableIteratable that takes a ton of time on the next() and hasNext() >>>>>> calls. My guess is the expression evaluation on each manifest entry is >>>>>> what's doing it. >>>>>> >>>>>> On Fri, Apr 19, 2019 at 1:41 PM Anton Okolnychyi < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> I think we need to have a list of columns for which we want to >>>>>>> collect stats and that should be configurable by the user. Maybe, this >>>>>>> config should be applicable only to lower/upper bounds. As we now >>>>>>> collect >>>>>>> stats even for nested struct fields, this might generate a lot of data. >>>>>>> In >>>>>>> most cases, users cluster/sort their data by a subset of data columns to >>>>>>> have fast queries with predicates on those columns. So, being able to >>>>>>> configure columns for which to collect lower/upper bounds seems >>>>>>> reasonable. >>>>>>> >>>>>>> On 19 Apr 2019, at 08:03, Gautam <[email protected]> wrote: >>>>>>> >>>>>>> > The length in bytes of the schema is 109M as compared to 687K of >>>>>>> the non-stats dataset. >>>>>>> >>>>>>> Typo, length in bytes of *manifest*. schema is the same. >>>>>>> >>>>>>> On Fri, Apr 19, 2019 at 12:16 PM Gautam <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Correction, partition count = 4308. >>>>>>>> >>>>>>>> > Re: Changing the way we keep stats. Avro is a block splittable >>>>>>>> format and is friendly with parallel compute frameworks like Spark. >>>>>>>> >>>>>>>> Here I am trying to say that we don't need to change the format to >>>>>>>> columnar right? The current format is already friendly for >>>>>>>> parallelization. >>>>>>>> >>>>>>>> thanks. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Apr 19, 2019 at 12:12 PM Gautam <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Ah, my bad. I missed adding in the schema details .. Here are some >>>>>>>>> details on the dataset with stats : >>>>>>>>> >>>>>>>>> Iceberg Schema Columns : 20 >>>>>>>>> Spark Schema fields : 20 >>>>>>>>> Snapshot Summary :{added-data-files=4308, added-records=11494037, >>>>>>>>> changed-partition-count=4308, total-records=11494037, >>>>>>>>> total-data-files=4308} >>>>>>>>> Manifest files :1 >>>>>>>>> Manifest details: >>>>>>>>> => manifest file path: >>>>>>>>> adl://[dataset_base_path]/metadata/4bcda033-9df5-4c84-8eef-9d6ef93e4347-m0.avro >>>>>>>>> => manifest file length: 109,028,885 >>>>>>>>> => existing files count: 0 >>>>>>>>> => added files count: 4308 >>>>>>>>> => deleted files count: 0 >>>>>>>>> => partitions count: 4 >>>>>>>>> => partition fields count: 4 >>>>>>>>> >>>>>>>>> Re: Num data files. It has a single manifest keep track of 4308 >>>>>>>>> files. Total record count is 11.4 Million. >>>>>>>>> >>>>>>>>> Re: Columns. You are right that this table has many columns.. >>>>>>>>> although it has only 20 top-level columns, num leaf columns are in >>>>>>>>> order >>>>>>>>> of thousands. This Schema is heavy on structs (in the thousands) and >>>>>>>>> has >>>>>>>>> deep levels of nesting. I know Iceberg keeps >>>>>>>>> *column_sizes, value_counts, null_value_counts* for all leaf >>>>>>>>> fields and additionally *lower-bounds, upper-bounds* for native, >>>>>>>>> struct types (not yet for map KVs and arrays). The length in bytes >>>>>>>>> of the >>>>>>>>> schema is 109M as compared to 687K of the non-stats dataset. >>>>>>>>> >>>>>>>>> Re: Turning off stats. I am looking to leverage stats coz for our >>>>>>>>> datasets with much larger number of data files we want to leverage >>>>>>>>> iceberg's ability to skip entire files based on these stats. This is >>>>>>>>> one of >>>>>>>>> the big incentives for us to use Iceberg. >>>>>>>>> >>>>>>>>> Re: Changing the way we keep stats. Avro is a block splittable >>>>>>>>> format and is friendly with parallel compute frameworks like Spark. So >>>>>>>>> would it make sense for instance to have add an option to have Spark >>>>>>>>> job / >>>>>>>>> Futures handle split planning? In a larger context, 109M is not >>>>>>>>> that >>>>>>>>> much metadata given that Iceberg is meant for datasets where the >>>>>>>>> metadata >>>>>>>>> itself is Bigdata scale. I'm curious on how folks with larger sized >>>>>>>>> metadata (in GB) are optimizing this today. >>>>>>>>> >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> -Gautam. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Apr 19, 2019 at 12:40 AM Ryan Blue < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Thanks for bringing this up! My initial theory is that this table >>>>>>>>>> has a ton of stats data that you have to read. That could happen in a >>>>>>>>>> couple of cases. >>>>>>>>>> >>>>>>>>>> First, you might have large values in some columns. Parquet will >>>>>>>>>> suppress its stats if values are larger than 4k and those are what >>>>>>>>>> Iceberg >>>>>>>>>> uses. But that could still cause you to store two 1k+ objects for >>>>>>>>>> each >>>>>>>>>> large column (lower and upper bounds). With a lot of data files, >>>>>>>>>> that could >>>>>>>>>> add up quickly. The solution here is to implement #113 >>>>>>>>>> <https://github.com/apache/incubator-iceberg/issues/113> so that >>>>>>>>>> we don't store the actual min and max for string or binary columns, >>>>>>>>>> but >>>>>>>>>> instead a truncated value that is just above or just below. >>>>>>>>>> >>>>>>>>>> The second case is when you have a lot of columns. Each column >>>>>>>>>> stores both a lower and upper bound, so 1,000 columns could easily >>>>>>>>>> take 8k >>>>>>>>>> per file. If this is the problem, then maybe we want to have a way >>>>>>>>>> to turn >>>>>>>>>> off column stats. We could also think of ways to change the way >>>>>>>>>> stats are >>>>>>>>>> stored in the manifest files, but that only helps if we move to a >>>>>>>>>> columnar >>>>>>>>>> format to store manifests, so this is probably not a short-term fix. >>>>>>>>>> >>>>>>>>>> If you can share a bit more information about this table, we can >>>>>>>>>> probably tell which one is the problem. I'm guessing it is the large >>>>>>>>>> values >>>>>>>>>> problem. >>>>>>>>>> >>>>>>>>>> On Thu, Apr 18, 2019 at 11:52 AM Gautam <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hello folks, >>>>>>>>>>> >>>>>>>>>>> I have been testing Iceberg reading with and without stats built >>>>>>>>>>> into Iceberg dataset manifest and found that there's a huge jump in >>>>>>>>>>> network >>>>>>>>>>> traffic with the latter.. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> In my test I am comparing two Iceberg datasets, both written in >>>>>>>>>>> Iceberg format. One with and the other without stats collected in >>>>>>>>>>> Iceberg >>>>>>>>>>> manifests. In particular the difference between the writers used >>>>>>>>>>> for the >>>>>>>>>>> two datasets is this PR: >>>>>>>>>>> https://github.com/apache/incubator-iceberg/pull/63/files which >>>>>>>>>>> uses Iceberg's writers for writing Parquet data. I captured tcpdump >>>>>>>>>>> from >>>>>>>>>>> query scans run on these two datasets. The partition being scanned >>>>>>>>>>> contains 1 manifest, 1 parquet data file and ~3700 rows in both >>>>>>>>>>> datasets. >>>>>>>>>>> There's a 30x jump in network traffic to the remote filesystem >>>>>>>>>>> (ADLS) when >>>>>>>>>>> i switch to stats based Iceberg dataset. Both queries used the same >>>>>>>>>>> Iceberg >>>>>>>>>>> reader code to access both datasets. >>>>>>>>>>> >>>>>>>>>>> ``` >>>>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r >>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap | grep >>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l >>>>>>>>>>> reading from file >>>>>>>>>>> iceberg_geo1_metrixx_qc_postvalues_batch_query.pcap, link-type >>>>>>>>>>> EN10MB >>>>>>>>>>> (Ethernet) >>>>>>>>>>> >>>>>>>>>>> *8844* >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> root@d69e104e7d40:/usr/local/spark# tcpdump -r >>>>>>>>>>> iceberg_scratch_pad_demo_11_batch_query.pcap | grep >>>>>>>>>>> perfanalysis.adlus15.projectcabostore.net | grep ">" | wc -l >>>>>>>>>>> reading from file iceberg_scratch_pad_demo_11_batch_query.pcap, >>>>>>>>>>> link-type EN10MB (Ethernet) >>>>>>>>>>> >>>>>>>>>>> *269708* >>>>>>>>>>> >>>>>>>>>>> ``` >>>>>>>>>>> >>>>>>>>>>> As a consequence of this the query response times get affected >>>>>>>>>>> drastically (illustrated below). I must confess that I am on a slow >>>>>>>>>>> internet connection via VPN connecting to the remote FS. But the >>>>>>>>>>> dataset >>>>>>>>>>> without stats took just 1m 49s while the dataset with stats took >>>>>>>>>>> 26m 48s to >>>>>>>>>>> read the same sized data. Most of that time in the latter dataset >>>>>>>>>>> was spent >>>>>>>>>>> split planning in Manifest reading and stats evaluation. >>>>>>>>>>> >>>>>>>>>>> ``` >>>>>>>>>>> all=> select count(*) from iceberg_geo1_metrixx_qc_postvalues >>>>>>>>>>> where batchId = '4a6f95abac924159bb3d7075373395c9'; >>>>>>>>>>> count(1) >>>>>>>>>>> ---------- >>>>>>>>>>> 3627 >>>>>>>>>>> (1 row) >>>>>>>>>>> *Time: 109673.202 ms (01:49.673)* >>>>>>>>>>> >>>>>>>>>>> all=> select count(*) from iceberg_scratch_pad_demo_11 where >>>>>>>>>>> _ACP_YEAR=2018 and _ACP_MONTH=01 and _ACP_DAY=01 and batchId = >>>>>>>>>>> '6d50eeb3e7d74b4f99eea91a27fc8f15'; >>>>>>>>>>> count(1) >>>>>>>>>>> ---------- >>>>>>>>>>> 3808 >>>>>>>>>>> (1 row) >>>>>>>>>>> *Time: 1608058.616 ms (26:48.059)* >>>>>>>>>>> >>>>>>>>>>> ``` >>>>>>>>>>> >>>>>>>>>>> Has anyone faced this? I'm wondering if there's some caching or >>>>>>>>>>> parallelism option here that can be leveraged. Would appreciate >>>>>>>>>>> some >>>>>>>>>>> guidance. If there isn't a straightforward fix and others feel this >>>>>>>>>>> is an >>>>>>>>>>> issue I can raise an issue and look into it further. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> -Gautam. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Ryan Blue >>>>>>>>>> Software Engineer >>>>>>>>>> Netflix >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Ryan Blue >>>>> Software Engineer >>>>> Netflix >>>>> >>>> >>>> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> >
