Scott, I don't think you can use S3FileIO with HadoopTables because
HadoopTables requires file system support for operations like rename and
the FileIO is not intended to support those features.

I think a really promising alternative is the DynamoDB Catalog
<https://github.com/apache/iceberg/pull/2688/files> implementation that
Jack Ye just submitted (still under review).

-Dan

On Thu, Jun 10, 2021 at 7:26 AM Scott Kruger <sckru...@paypal.com.invalid>
wrote:

> Going to bump this question then:
>
> > Not using EMRFS for the metadata is an interesting possibility. We’re
> using HadoopTables currently; is there a Tables implementation that uses
> S3FileIO that we can use, or can I somehow tell HadoopTables to use
> S3FileIO?
>
>
>
>
>
> *From: *Ryan Blue <b...@apache.org>
> *Reply-To: *"dev@iceberg.apache.org" <dev@iceberg.apache.org>
> *Date: *Wednesday, June 9, 2021 at 4:10 PM
> *To: *"dev@iceberg.apache.org" <dev@iceberg.apache.org>
> *Subject: *Re: Consistency problems with Iceberg + EMRFS
>
>
>
> This message is from an external sender.
>
> Thanks for the additional detail. If you're not writing concurrently, then
> that eliminates the explanations that I had. I also don't think that
> Iceberg retries would be a problem because Iceberg will only retry if the
> commit fails. But there is no reason for a commit to fail and retry because
> nothing else is trying to modify the table. To make sure, you can check for
> "Retrying" logs from Iceberg.
>
>
>
> Now that I'm looking more closely at the second error, I see that it is
> also caused by the eTag mismatch. I wonder if this might be a different
> level of retry. Maybe EMRFS has a transient error and that causes an
> internal retry on the write that is the source of the consistency error?
>
>
>
> What you may be able to do to solve this is to use the S3FileIO instead of
> EMRFS.
>
>
>
> Ryan
>
>
>
> On Wed, Jun 9, 2021 at 9:02 AM Scott Kruger <sckru...@paypal.com.invalid>
> wrote:
>
> Here’s a little more detail on our use case that might be helpful. We’re
> running a batch process to apply CDC to several hundred tables every few
> hours; we use iceberg (via HadoopTables) on top of a traditional Hive
> external table model (EMRFS + parquet + glue metastore)  to track the
> commits (that is, changes to the list of files) to these tables. There are
> a number of technical and “political” reasons for this that don’t really
> bear going into; all we really needed was a way to track files belong to a
> table that are managed via some process external to iceberg. We have a few
> guarantees:
>
>
>
>    - Tables never, *ever* see concurrent writes; only one application
>    writes to these tables, and only one instance of this application ever
>    exists at any time
>    - Our application rewrites entire partitions to new directories, so we
>    don’t need iceberg to help us read a handful of files from directories with
>    files from multiple commits
>    - Our interaction with the iceberg API is *extremely* limited
>
>
>
> overwrite = table.newOverwrite()
>
> for each updated partition
>
>      for each file in old partition directory
>
>            overwrite.deleteFile(file)
>
>      for each file in new partition directory
>
>            overwrite.addFile(file)
>
> overwrite.commit()
>
>
>
> So, all that being said, now to address your comments. We don’t have
> concurrent processes writing commits, so the problem has to be contained in
> that pseudocode block above. We don’t ever have any consistency issues with
> the actual data files we write (using plain spark DataFrameWriter.parquet),
> so there has to be something going on with how iceberg is writing metadata
> over EMRFS. It feels like retry logic is a likely culprit, as this only
> happens once daily for something like 10000 commits. Using the metastore is
> unfortunately a non-starter for us, but given that we don’t need to support
> concurrent writes, I don’t think this is a problem.
>
>
>
> Not using EMRFS for the metadata is an interesting possibility. We’re
> using HadoopTables currently; is there a Tables implementation that uses
> S3FileIO that we can use, or can I somehow tell HadoopTables to use
> S3FileIO?
>
>
>
> *From: *Jack Ye <yezhao...@gmail.com>
> *Reply-To: *"dev@iceberg.apache.org" <dev@iceberg.apache.org>
> *Date: *Tuesday, June 8, 2021 at 7:49 PM
> *To: *"dev@iceberg.apache.org" <dev@iceberg.apache.org>
> *Subject: *Re: Consistency problems with Iceberg + EMRFS
>
>
>
> This message was identified as a phishing scam.
>
> There are 2 potential root causes I see:
>
> 1. you might be using EMRFS with DynamoDB enabled to check consistency,
> that leads to the DynamoDB and S3 out of sync. The quick solution is to
> just delete the DynamoDB consistency table, and the next read/write will
> recreate and resync it. After all, EMRFS only provides read-after-write
> consistency for S3, but S3 is now already strongly consistent so there is
> really no need to use EMRFS anymore.
>
> 2. HadoopCatalog on S3 always has the possibility for one process to
> clobber the other one when writing the version-hint.txt file. So as Ryan
> suggested, it is always better to use a metastore to perform consistency
> checks instead of delegating it to the file system.
>
>
>
> -Jack
>
>
>
> On Tue, Jun 8, 2021 at 5:41 PM Ryan Blue <b...@apache.org> wrote:
>
> Hi Scott,
>
>
>
> I'm not quite sure what's happening here, but I should at least note that
> we didn't intend for HDFS tables to be used with S3. HFDS tables use an
> atomic rename in the file system to ensure that only one committer "wins"
> to produce a given version of the table metadata. In S3, renames are not
> atomic so you can get into trouble if there are two concurrent processes
> trying to rename to the same target version. That's probably what's causing
> the first issue, where the eTag for a file doesn't match the expected one.
>
>
>
> As for the second issue, it looks like the version hint file is not valid.
> We did some work to correct these issues in HDFS that was released in
> 0.11.0, so I'm surprised to see this. Now, the version hint file is written
> and then renamed to avoid issues with reads while the file is being written.
>
>
>
> I'm not sure how you had the second issue on S3, but the solution is
> probably the same as for the eTag issue: I recommend moving to a metastore
> to track the current table metadata rather than using the HDFS
> implementation.
>
>
>
> Ryan
>
>
>
> On Tue, Jun 8, 2021 at 5:27 PM Scott Kruger <sckru...@paypal.com.invalid>
> wrote:
>
> We’re using the Iceberg API (0.11.1) over raw parquet data in S3/EMRFS,
> basically just using the table API to issues overwrites/appends. Everything
> works great for the most part, but we’ve recently started to have problems
> with the iceberg metadata directory going out of sync. See the following
> stacktrace:
>
>
>
> org.apache.iceberg.exceptions.RuntimeIOException: Failed to read file:
> s3://mybucket/db/table/metadata/v2504.metadata.json
>
> at
> org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:241)
>
> at
> org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:233)
>
> at
> org.apache.iceberg.hadoop.HadoopTableOperations.updateVersionAndMetadata(HadoopTableOperations.java:93)
>
> at
> org.apache.iceberg.hadoop.HadoopTableOperations.refresh(HadoopTableOperations.java:116)
>
> at
> org.apache.iceberg.hadoop.HadoopTableOperations.current(HadoopTableOperations.java:80)
>
> at org.apache.iceberg.hadoop.HadoopTables.load(HadoopTables.java:86)
>
> at
> com.braintree.data.common.snapshot.iceberg.IcebergUtils$Builder.load(IcebergUtils.java:639)
>
> at
> com.braintree.data.snapshot.actions.UpdateTableMetadata.run(UpdateTableMetadata.java:53)
>
> at
> com.braintree.data.snapshot.actions.UpdateMetastore.lambda$run$0(UpdateMetastore.java:104)
>
> at
> com.braintree.data.base.util.StreamUtilities.lambda$null$7(StreamUtilities.java:306)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.io.IOException: Unexpected end of stream pos=0,
> contentLength=214601
>
> at
> com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.read(S3FSInputStream.java:297)
>
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
>
> at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
>
> at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
>
> at java.io.DataInputStream.read(DataInputStream.java:149)
>
> at
> org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.read(HadoopStreams.java:113)
>
> at
> org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:524)
>
> at
> org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:129)
>
> at
> org.apache.iceberg.shaded.com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:247)
>
> at
> org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1481)
>
> at
> org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:972)
>
> at
> org.apache.iceberg.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3242)
>
> at
> org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:239)
>
> ... 15 more
>
> Caused by:
> com.amazon.ws.emr.hadoop.fs.consistency.exception.ConsistencyException:
> eTag in metadata for File mybucket/db/table/metadata/v2504.metadata.json'
> does not match eTag from S3!
>
> at
> com.amazon.ws.emr.hadoop.fs.s3.GetObjectInputStreamWithInfoFactory.create(GetObjectInputStreamWithInfoFactory.java:69)
>
> at
> com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.open(S3FSInputStream.java:200)
>
> at
> com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.retrieveInputStreamWithInfo(S3FSInputStream.java:391)
>
> at
> com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.reopenStream(S3FSInputStream.java:378)
>
> at
> com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.read(S3FSInputStream.java:260)
>
> ... 27 more
>
>
>
> Earlier in my logs, I see the following similar warning:
>
>
>
> 21/06/08 23:20:32 pool-117-thread-1 WARN HadoopTableOperations: Error
> reading version hint file s3://mybucket/db/table/metadata/version-hint.text
>
> java.io.IOException: Unexpected end of stream pos=0, contentLength=4
>
> at
> com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.read(S3FSInputStream.java:297)
>
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
>
> at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
>
> at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
>
> at java.io.DataInputStream.read(DataInputStream.java:149)
>
> at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
>
> at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
>
> at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
>
> at java.io.InputStreamReader.read(InputStreamReader.java:184)
>
> at java.io.BufferedReader.fill(BufferedReader.java:161)
>
> at java.io.BufferedReader.readLine(BufferedReader.java:324)
>
> at java.io.BufferedReader.readLine(BufferedReader.java:389)
>
> at
> org.apache.iceberg.hadoop.HadoopTableOperations.findVersion(HadoopTableOperations.java:318)
>
> at
> org.apache.iceberg.hadoop.HadoopTableOperations.refresh(HadoopTableOperations.java:99)
>
> at
> org.apache.iceberg.hadoop.HadoopTableOperations.current(HadoopTableOperations.java:80)
>
> at org.apache.iceberg.hadoop.HadoopTables.load(HadoopTables.java:86)
>
> … INTERNAL STUFF…
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by:
> com.amazon.ws.emr.hadoop.fs.consistency.exception.ConsistencyException:
> eTag in metadata for File ‘mybucket/db/table/metadata/version-hint.text'
> does not match eTag from S3!
>
> at
> com.amazon.ws.emr.hadoop.fs.s3.GetObjectInputStreamWithInfoFactory.create(GetObjectInputStreamWithInfoFactory.java:69)
>
> at
> com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.open(S3FSInputStream.java:200)
>
> at
> com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.retrieveInputStreamWithInfo(S3FSInputStream.java:391)
>
> at
> com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.reopenStream(S3FSInputStream.java:378)
>
> at
> com.amazon.ws.emr.hadoop.fs.s3.S3FSInputStream.read(S3FSInputStream.java:260)
>
> ... 25 more
>
>
>
> This only happens every once in a while, so my best guess is that there’s
> some weird eventual consistency problem or perhaps something with retry
> logic?
>
>
>
> My question is: is there a correct way of using iceberg on EMRFS? FWIW, I
> haven’t included the AWS v2 SDK in my classpath.
>
>
>
>
> --
>
> Ryan Blue
>
>
>
>
> --
>
> Ryan Blue
>

Reply via email to