Yeah, Dan is right. If you want to use HDFS tables then you have to use a Hadoop FileSystem directly since the FileIO interface doesn't include rename (because we don't want to use it).
On Thu, Jun 10, 2021 at 8:52 AM Scott Kruger <sckru...@paypal.com.invalid> wrote: > The dynamodb catalog sounds interesting; I’ll keep my eye on that. There’s > got to be some way to manage tables in 0.11.1 with S3FileIO though, right? > We’re using spark 3; perhaps we can use `SparkCatalog` instead of > `HadoopTables`? > > > > *From: *Daniel Weeks <dwe...@apache.org> > *Reply-To: *"dev@iceberg.apache.org" <dev@iceberg.apache.org> > *Date: *Thursday, June 10, 2021 at 10:36 AM > *To: *Iceberg Dev List <dev@iceberg.apache.org> > *Subject: *Re: Consistency problems with Iceberg + EMRFS > > > > This message contains hyperlinks, take precaution before opening these > links. > > Scott, I don't think you can use S3FileIO with HadoopTables because > HadoopTables requires file system support for operations like rename and > the FileIO is not intended to support those features. > > > > I think a really promising alternative is the DynamoDB Catalog > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Ficeberg%2Fpull%2F2688%2Ffiles&data=04%7C01%7Csckruger%40paypal.com%7C7818d2ec3a454fc91a4d08d92c257226%7Cfb00791460204374977e21bac5f3f4c8%7C0%7C0%7C637589361724551495%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=T9NiG8iRE44kF9eLCzXQSLtmCcWnYWx7G5rlASvFFgk%3D&reserved=0> > implementation that Jack Ye just submitted (still under review). > > > > -Dan > > > > On Thu, Jun 10, 2021 at 7:26 AM Scott Kruger <sckru...@paypal.com.invalid> > wrote: > > Going to bump this question then: > > > Not using EMRFS for the metadata is an interesting possibility. We’re > using HadoopTables currently; is there a Tables implementation that uses > S3FileIO that we can use, or can I somehow tell HadoopTables to use > S3FileIO? > > > > > > *From: *Ryan Blue <b...@apache.org> > *Reply-To: *"dev@iceberg.apache.org" <dev@iceberg.apache.org> > *Date: *Wednesday, June 9, 2021 at 4:10 PM > *To: *"dev@iceberg.apache.org" <dev@iceberg.apache.org> > *Subject: *Re: Consistency problems with Iceberg + EMRFS > > > > This message is from an external sender. > > Thanks for the additional detail. If you're not writing concurrently, then > that eliminates the explanations that I had. I also don't think that > Iceberg retries would be a problem because Iceberg will only retry if the > commit fails. But there is no reason for a commit to fail and retry because > nothing else is trying to modify the table. To make sure, you can check for > "Retrying" logs from Iceberg. > > > > Now that I'm looking more closely at the second error, I see that it is > also caused by the eTag mismatch. I wonder if this might be a different > level of retry. Maybe EMRFS has a transient error and that causes an > internal retry on the write that is the source of the consistency error? > > > > What you may be able to do to solve this is to use the S3FileIO instead of > EMRFS. > > > > Ryan > > > > On Wed, Jun 9, 2021 at 9:02 AM Scott Kruger <sckru...@paypal.com.invalid> > wrote: > > Here’s a little more detail on our use case that might be helpful. We’re > running a batch process to apply CDC to several hundred tables every few > hours; we use iceberg (via HadoopTables) on top of a traditional Hive > external table model (EMRFS + parquet + glue metastore) to track the > commits (that is, changes to the list of files) to these tables. There are > a number of technical and “political” reasons for this that don’t really > bear going into; all we really needed was a way to track files belong to a > table that are managed via some process external to iceberg. We have a few > guarantees: > > > > - Tables never, *ever* see concurrent writes; only one application > writes to these tables, and only one instance of this application ever > exists at any time > - Our application rewrites entire partitions to new directories, so we > don’t need iceberg to help us read a handful of files from directories with > files from multiple commits > - Our interaction with the iceberg API is *extremely* limited > > > > overwrite = table.newOverwrite() > > for each updated partition > > for each file in old partition directory > > overwrite.deleteFile(file) > > for each file in new partition directory > > overwrite.addFile(file) > > overwrite.commit() > > > > So, all that being said, now to address your comments. We don’t have > concurrent processes writing commits, so the problem has to be contained in > that pseudocode block above. We don’t ever have any consistency issues with > the actual data files we write (using plain spark DataFrameWriter.parquet), > so there has to be something going on with how iceberg is writing metadata > over EMRFS. It feels like retry logic is a likely culprit, as this only > happens once daily for something like 10000 commits. Using the metastore is > unfortunately a non-starter for us, but given that we don’t need to support > concurrent writes, I don’t think this is a problem. > > > > Not using EMRFS for the metadata is an interesting possibility. We’re > using HadoopTables currently; is there a Tables implementation that uses > S3FileIO that we can use, or can I somehow tell HadoopTables to use > S3FileIO? > > > > *From: *Jack Ye <yezhao...@gmail.com> > *Reply-To: *"dev@iceberg.apache.org" <dev@iceberg.apache.org> > *Date: *Tuesday, June 8, 2021 at 7:49 PM > *To: *"dev@iceberg.apache.org" <dev@iceberg.apache.org> > *Subject: *Re: Consistency problems with Iceberg + EMRFS > > > > This message was identified as a phishing scam. > > There are 2 potential root causes I see: > > 1. you might be using EMRFS with DynamoDB enabled to check consistency, > that leads to the DynamoDB and S3 out of sync. The quick solution is to > just delete the DynamoDB consistency table, and the next read/write will > recreate and resync it. After all, EMRFS only provides read-after-write > consistency for S3, but S3 is now already strongly consistent so there is > really no need to use EMRFS anymore. > > 2. HadoopCatalog on S3 always has the possibility for one process to > clobber the other one when writing the version-hint.txt file. So as Ryan > suggested, it is always better to use a metastore to perform consistency > checks instead of delegating it to the file system. > > > > -Jack > > > > On Tue, Jun 8, 2021 at 5:41 PM Ryan Blue <b...@apache.org> wrote: > > Hi Scott, > > > > I'm not quite sure what's happening here, but I should at least note that > we didn't intend for HDFS tables to be used with S3. HFDS tables use an > atomic rename in the file system to ensure that only one committer "wins" > to produce a given version of the table metadata. In S3, renames are not > atomic so you can get into trouble if there are two concurrent processes > trying to rename to the same target version. That's probably what's causing > the first issue, where the eTag for a file doesn't match the expected one. > > > > As for the second issue, it looks like the version hint file is not valid. > We did some work to correct these issues in HDFS that was released in > 0.11.0, so I'm surprised to see this. Now, the version hint file is written > and then renamed to avoid issues with reads while the file is being written. > > > > I'm not sure how you had the second issue on S3, but the solution is > probably the same as for the eTag issue: I recommend moving to a metastore > to track the current table metadata rather than using the HDFS > implementation. > > > > Ryan > > > > On Tue, Jun 8, 2021 at 5:27 PM Scott Kruger <sckru...@paypal.com.invalid> > wrote: > > We’re using the Iceberg API (0.11.1) over raw parquet data in S3/EMRFS, > basically just using the table API to issues overwrites/appends. Everything > works great for the most part, but we’ve recently started to have problems > with the iceberg metadata directory going out of sync. See the following > stacktrace: > > > > org.apache.iceberg.exceptions.RuntimeIOException: Failed to read file: > s3://mybucket/db/table/metadata/v2504.metadata.json > > at > org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:241) > > at > org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:233) > > at > org.apache.iceberg.hadoop.HadoopTableOperations.updateVersionAndMetadata(HadoopTableOperations.java:93) > > at > org.apache.iceberg.hadoop.HadoopTableOperations.refresh(HadoopTableOperations.java:116) > > at > org.apache.iceberg.hadoop.HadoopTableOperations.current(HadoopTableOperations.java:80) > > at org.apache.iceberg.hadoop.HadoopTables.load(HadoopTables.java:86) > > at > com.braintree.data.common.snapshot.iceberg.IcebergUtils$Builder.load(IcebergUtils.java:639) > > at > com.braintree.data.snapshot.actions.UpdateTableMetadata.run(UpdateTableMetadata.java:53) > > at > com.braintree.data.snapshot.actions.UpdateMetastore.lambda$run$0(UpdateMetastore.java:104) > > at > com.braintree.data.base.util.StreamUtilities.lambda$null$7(StreamUtilities.java:306) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.io.IOException: Unexpected end of stream pos=0, > contentLength=214601 > > at > com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.read(S3FSInputStream.java:297) > > at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) > > at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) > > at java.io.BufferedInputStream.read(BufferedInputStream.java:345) > > at java.io.DataInputStream.read(DataInputStream.java:149) > > at > org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.read(HadoopStreams.java:113) > > at > org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:524) > > at > org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:129) > > at > org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:247) > > at > org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1481) > > at > org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:972) > > at > org.apache.iceberg.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3242) > > at > org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:239) > > ... 15 more > > Caused by: > com.amazon.ws.emr.hadoop.fs.consistency.exception.ConsistencyException: > eTag in metadata for File mybucket/db/table/metadata/v2504.metadata.json' > does not match eTag from S3! > > at > com.amazon.ws.emr.hadoop.fs.s3.GetObjectInputStreamWithInfoFactory.create(GetObjectInputStreamWithInfoFactory.java:69) > > at > com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.open(S3FSInputStream.java:200) > > at > com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.retrieveInputStreamWithInfo(S3FSInputStream.java:391) > > at > com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.reopenStream(S3FSInputStream.java:378) > > at > com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.read(S3FSInputStream.java:260) > > ... 27 more > > > > Earlier in my logs, I see the following similar warning: > > > > 21/06/08 23:20:32 pool-117-thread-1 WARN HadoopTableOperations: Error > reading version hint file s3://mybucket/db/table/metadata/version-hint.text > > java.io.IOException: Unexpected end of stream pos=0, contentLength=4 > > at > com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.read(S3FSInputStream.java:297) > > at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) > > at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) > > at java.io.BufferedInputStream.read(BufferedInputStream.java:345) > > at java.io.DataInputStream.read(DataInputStream.java:149) > > at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) > > at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) > > at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) > > at java.io.InputStreamReader.read(InputStreamReader.java:184) > > at java.io.BufferedReader.fill(BufferedReader.java:161) > > at java.io.BufferedReader.readLine(BufferedReader.java:324) > > at java.io.BufferedReader.readLine(BufferedReader.java:389) > > at > org.apache.iceberg.hadoop.HadoopTableOperations.findVersion(HadoopTableOperations.java:318) > > at > org.apache.iceberg.hadoop.HadoopTableOperations.refresh(HadoopTableOperations.java:99) > > at > org.apache.iceberg.hadoop.HadoopTableOperations.current(HadoopTableOperations.java:80) > > at org.apache.iceberg.hadoop.HadoopTables.load(HadoopTables.java:86) > > … INTERNAL STUFF… > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: > com.amazon.ws.emr.hadoop.fs.consistency.exception.ConsistencyException: > eTag in metadata for File ‘mybucket/db/table/metadata/version-hint.text' > does not match eTag from S3! > > at > com.amazon.ws.emr.hadoop.fs.s3.GetObjectInputStreamWithInfoFactory.create(GetObjectInputStreamWithInfoFactory.java:69) > > at > com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.open(S3FSInputStream.java:200) > > at > com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.retrieveInputStreamWithInfo(S3FSInputStream.java:391) > > at > com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.reopenStream(S3FSInputStream.java:378) > > at > com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.read(S3FSInputStream.java:260) > > ... 25 more > > > > This only happens every once in a while, so my best guess is that there’s > some weird eventual consistency problem or perhaps something with retry > logic? > > > > My question is: is there a correct way of using iceberg on EMRFS? FWIW, I > haven’t included the AWS v2 SDK in my classpath. > > > > > -- > > Ryan Blue > > > > > -- > > Ryan Blue > > -- Ryan Blue