Here’s a little more detail on our use case that might be helpful. We’re
running a batch process to apply CDC to several hundred tables every few hours;
we use iceberg (via HadoopTables) on top of a traditional Hive external table
model (EMRFS + parquet + glue metastore) to track the commits (that is,
changes to the list of files) to these tables. There are a number of technical
and “political” reasons for this that don’t really bear going into; all we
really needed was a way to track files belong to a table that are managed via
some process external to iceberg. We have a few guarantees:
* Tables never, ever see concurrent writes; only one application writes to
these tables, and only one instance of this application ever exists at any time
* Our application rewrites entire partitions to new directories, so we
don’t need iceberg to help us read a handful of files from directories with
files from multiple commits
* Our interaction with the iceberg API is extremely limited
overwrite = table.newOverwrite()
for each updated partition
for each file in old partition directory
overwrite.deleteFile(file)
for each file in new partition directory
overwrite.addFile(file)
overwrite.commit()
So, all that being said, now to address your comments. We don’t have concurrent
processes writing commits, so the problem has to be contained in that
pseudocode block above. We don’t ever have any consistency issues with the
actual data files we write (using plain spark DataFrameWriter.parquet), so
there has to be something going on with how iceberg is writing metadata over
EMRFS. It feels like retry logic is a likely culprit, as this only happens once
daily for something like 10000 commits. Using the metastore is unfortunately a
non-starter for us, but given that we don’t need to support concurrent writes,
I don’t think this is a problem.
Not using EMRFS for the metadata is an interesting possibility. We’re using
HadoopTables currently; is there a Tables implementation that uses S3FileIO
that we can use, or can I somehow tell HadoopTables to use S3FileIO?
From: Jack Ye <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Tuesday, June 8, 2021 at 7:49 PM
To: "[email protected]" <[email protected]>
Subject: Re: Consistency problems with Iceberg + EMRFS
This message was identified as a phishing scam.
There are 2 potential root causes I see:
1. you might be using EMRFS with DynamoDB enabled to check consistency, that
leads to the DynamoDB and S3 out of sync. The quick solution is to just delete
the DynamoDB consistency table, and the next read/write will recreate and
resync it. After all, EMRFS only provides read-after-write consistency for S3,
but S3 is now already strongly consistent so there is really no need to use
EMRFS anymore.
2. HadoopCatalog on S3 always has the possibility for one process to clobber
the other one when writing the version-hint.txt file. So as Ryan suggested, it
is always better to use a metastore to perform consistency checks instead of
delegating it to the file system.
-Jack
On Tue, Jun 8, 2021 at 5:41 PM Ryan Blue
<[email protected]<mailto:[email protected]>> wrote:
Hi Scott,
I'm not quite sure what's happening here, but I should at least note that we
didn't intend for HDFS tables to be used with S3. HFDS tables use an atomic
rename in the file system to ensure that only one committer "wins" to produce a
given version of the table metadata. In S3, renames are not atomic so you can
get into trouble if there are two concurrent processes trying to rename to the
same target version. That's probably what's causing the first issue, where the
eTag for a file doesn't match the expected one.
As for the second issue, it looks like the version hint file is not valid. We
did some work to correct these issues in HDFS that was released in 0.11.0, so
I'm surprised to see this. Now, the version hint file is written and then
renamed to avoid issues with reads while the file is being written.
I'm not sure how you had the second issue on S3, but the solution is probably
the same as for the eTag issue: I recommend moving to a metastore to track the
current table metadata rather than using the HDFS implementation.
Ryan
On Tue, Jun 8, 2021 at 5:27 PM Scott Kruger <[email protected]> wrote:
We’re using the Iceberg API (0.11.1) over raw parquet data in S3/EMRFS,
basically just using the table API to issues overwrites/appends. Everything
works great for the most part, but we’ve recently started to have problems with
the iceberg metadata directory going out of sync. See the following stacktrace:
org.apache.iceberg.exceptions.RuntimeIOException: Failed to read file:
s3://mybucket/db/table/metadata/v2504.metadata.json
at org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:241)
at org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:233)
at
org.apache.iceberg.hadoop.HadoopTableOperations.updateVersionAndMetadata(HadoopTableOperations.java:93)
at
org.apache.iceberg.hadoop.HadoopTableOperations.refresh(HadoopTableOperations.java:116)
at
org.apache.iceberg.hadoop.HadoopTableOperations.current(HadoopTableOperations.java:80)
at org.apache.iceberg.hadoop.HadoopTables.load(HadoopTables.java:86)
at
com.braintree.data.common.snapshot.iceberg.IcebergUtils$Builder.load(IcebergUtils.java:639)
at
com.braintree.data.snapshot.actions.UpdateTableMetadata.run(UpdateTableMetadata.java:53)
at
com.braintree.data.snapshot.actions.UpdateMetastore.lambda$run$0(UpdateMetastore.java:104)
at
com.braintree.data.base.util.StreamUtilities.lambda$null$7(StreamUtilities.java:306)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Unexpected end of stream pos=0,
contentLength=214601
at com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.read(S3FSInputStream.java:297)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.DataInputStream.read(DataInputStream.java:149)
at
org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.read(HadoopStreams.java:113)
at
org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:524)
at
org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:129)
at
org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:247)
at
org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1481)
at
org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:972)
at
org.apache.iceberg.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3242)
at org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:239)
... 15 more
Caused by:
com.amazon.ws.emr.hadoop.fs.consistency.exception.ConsistencyException: eTag in
metadata for File mybucket/db/table/metadata/v2504.metadata.json' does not
match eTag from S3!
at
com.amazon.ws.emr.hadoop.fs.s3.GetObjectInputStreamWithInfoFactory.create(GetObjectInputStreamWithInfoFactory.java:69)
at com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.open(S3FSInputStream.java:200)
at
com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.retrieveInputStreamWithInfo(S3FSInputStream.java:391)
at
com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.reopenStream(S3FSInputStream.java:378)
at com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.read(S3FSInputStream.java:260)
... 27 more
Earlier in my logs, I see the following similar warning:
21/06/08 23:20:32 pool-117-thread-1 WARN HadoopTableOperations: Error reading
version hint file s3://mybucket/db/table/metadata/version-hint.text
java.io.IOException: Unexpected end of stream pos=0, contentLength=4
at com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.read(S3FSInputStream.java:297)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
at java.io.DataInputStream.read(DataInputStream.java:149)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at
org.apache.iceberg.hadoop.HadoopTableOperations.findVersion(HadoopTableOperations.java:318)
at
org.apache.iceberg.hadoop.HadoopTableOperations.refresh(HadoopTableOperations.java:99)
at
org.apache.iceberg.hadoop.HadoopTableOperations.current(HadoopTableOperations.java:80)
at org.apache.iceberg.hadoop.HadoopTables.load(HadoopTables.java:86)
… INTERNAL STUFF…
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by:
com.amazon.ws.emr.hadoop.fs.consistency.exception.ConsistencyException: eTag in
metadata for File ‘mybucket/db/table/metadata/version-hint.text' does not match
eTag from S3!
at
com.amazon.ws.emr.hadoop.fs.s3.GetObjectInputStreamWithInfoFactory.create(GetObjectInputStreamWithInfoFactory.java:69)
at com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.open(S3FSInputStream.java:200)
at
com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.retrieveInputStreamWithInfo(S3FSInputStream.java:391)
at
com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.reopenStream(S3FSInputStream.java:378)
at com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.read(S3FSInputStream.java:260)
... 25 more
This only happens every once in a while, so my best guess is that there’s some
weird eventual consistency problem or perhaps something with retry logic?
My question is: is there a correct way of using iceberg on EMRFS? FWIW, I
haven’t included the AWS v2 SDK in my classpath.
--
Ryan Blue