jackye1995 commented on a change in pull request #3376:
URL: https://github.com/apache/iceberg/pull/3376#discussion_r742065378



##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsClientProperties.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+
+/**
+ * Property constants of catalog
+ */
+public interface EcsClientProperties {
+
+  /**
+   * Access key id
+   */
+  String ACCESS_KEY_ID = "ecs.s3.access.key.id";

Review comment:
       just want to point out, in AWS module we decided to not exposing these 
properties related to credentials, because this forces users to hard-code their 
secrets in plain text and is not a good security practice. Also this forces 
users to use a long-term credential instead of a session credential that will 
expire and has lower security burden. That is why we had the client factory 
class so that by default we load credentials using the default credentials 
chain, and users customize the client initialization process directly if they 
want. I don't know enough about ECS service use pattern, maybe this is a common 
use pattern on your side, but just keep in mind about these potential security 
risks.

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsFileIO.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link java.io.Externalizable} FileIO of ECS S3 object client.
+ */
+public class EcsFileIO implements FileIO, Externalizable, AutoCloseable {
+
+  private static final Logger log = LoggerFactory.getLogger(EcsFileIO.class);
+
+  /**
+   * Saved properties for {@link java.io.Serializable}
+   */
+  private Map<String, String> properties;
+  private S3Client client;
+  private final AtomicBoolean closed = new AtomicBoolean(true);
+
+  /**
+   * Blank constructor
+   */
+  public EcsFileIO() {
+  }
+
+  @Override
+  public void initialize(Map<String, String> inputProperties) {
+    if (closed.compareAndSet(true, false)) {
+      this.properties = Collections.unmodifiableMap(new 
LinkedHashMap<>(inputProperties));

Review comment:
       prefer to directly use `ImmutableMap`

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsFileIO.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link java.io.Externalizable} FileIO of ECS S3 object client.
+ */
+public class EcsFileIO implements FileIO, Externalizable, AutoCloseable {
+
+  private static final Logger log = LoggerFactory.getLogger(EcsFileIO.class);
+
+  /**
+   * Saved properties for {@link java.io.Serializable}
+   */
+  private Map<String, String> properties;
+  private S3Client client;
+  private final AtomicBoolean closed = new AtomicBoolean(true);

Review comment:
       I think typically we don't worry about re-initializing the class and 
just assume it is only initialized once. Is there any particular use case for 
having this atomic check for every operation?

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsOutputFile.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class EcsOutputFile implements OutputFile {
+
+  private final S3Client client;
+  private final String location;
+  private final EcsURI uri;
+
+  public EcsOutputFile(S3Client client, String location) {
+    this.client = client;
+    this.location = location;
+    this.uri = LocationUtils.checkAndParseLocation(location);
+  }
+
+  /**
+   * Check object existence and then create a {@link PositionOutputStream}
+   *
+   * @return Output stream of object
+   */
+  @Override
+  public PositionOutputStream create() {
+    if (!toInputFile().exists()) {
+      return createOrOverwrite();
+    } else {
+      throw new AlreadyExistsException("The object is existed, %s", uri);

Review comment:
       nit: "ECS object already exists: %s"

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsOutputFile.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class EcsOutputFile implements OutputFile {
+
+  private final S3Client client;
+  private final String location;
+  private final EcsURI uri;
+
+  public EcsOutputFile(S3Client client, String location) {
+    this.client = client;
+    this.location = location;
+    this.uri = LocationUtils.checkAndParseLocation(location);
+  }
+
+  /**
+   * Check object existence and then create a {@link PositionOutputStream}
+   *
+   * @return Output stream of object
+   */
+  @Override
+  public PositionOutputStream create() {
+    if (!toInputFile().exists()) {
+      return createOrOverwrite();
+    } else {
+      throw new AlreadyExistsException("The object is existed, %s", uri);
+    }
+  }
+
+  @Override
+  public PositionOutputStream createOrOverwrite() {
+    // use built-in 1 KiB byte buffer
+    return new EcsAppendOutputStream(client, uri, new byte[1_000]);

Review comment:
       why not directly use `ByteBuffer.allocate`? Then you don't need to 
convert it back to byte buffer in the constructor.  Also would it be more 
beneficial to allocate 1024 vs 1000?

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsOutputFile.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class EcsOutputFile implements OutputFile {
+
+  private final S3Client client;
+  private final String location;
+  private final EcsURI uri;
+
+  public EcsOutputFile(S3Client client, String location) {
+    this.client = client;
+    this.location = location;
+    this.uri = LocationUtils.checkAndParseLocation(location);
+  }
+
+  /**
+   * Check object existence and then create a {@link PositionOutputStream}
+   *
+   * @return Output stream of object
+   */
+  @Override
+  public PositionOutputStream create() {
+    if (!toInputFile().exists()) {
+      return createOrOverwrite();
+    } else {
+      throw new AlreadyExistsException("The object is existed, %s", uri);
+    }
+  }
+
+  @Override
+  public PositionOutputStream createOrOverwrite() {
+    // use built-in 1 KiB byte buffer
+    return new EcsAppendOutputStream(client, uri, new byte[1_000]);

Review comment:
       And I think we prefer to pass ByteBuffer as argument instead of direct 
byte array. So could you at least move the `ByteBuffer.wrap` to this level, or 
the constructor don't really need to have this argument since you always create 
in a new byte buffer of fixed length, so you can directly create it in the 
constructor instead.

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsOutputFile.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class EcsOutputFile implements OutputFile {
+
+  private final S3Client client;
+  private final String location;
+  private final EcsURI uri;
+
+  public EcsOutputFile(S3Client client, String location) {
+    this.client = client;
+    this.location = location;
+    this.uri = LocationUtils.checkAndParseLocation(location);
+  }
+
+  /**
+   * Check object existence and then create a {@link PositionOutputStream}
+   *
+   * @return Output stream of object
+   */
+  @Override
+  public PositionOutputStream create() {
+    if (!toInputFile().exists()) {
+      return createOrOverwrite();
+    } else {
+      throw new AlreadyExistsException("The object is existed, %s", uri);
+    }
+  }
+
+  @Override
+  public PositionOutputStream createOrOverwrite() {
+    // use built-in 1 KiB byte buffer
+    return new EcsAppendOutputStream(client, uri, new byte[1_000]);

Review comment:
       And could you make your buffer size a private static constant for 
readability, thanks!

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsOutputFile.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class EcsOutputFile implements OutputFile {
+
+  private final S3Client client;
+  private final String location;
+  private final EcsURI uri;
+
+  public EcsOutputFile(S3Client client, String location) {
+    this.client = client;
+    this.location = location;
+    this.uri = LocationUtils.checkAndParseLocation(location);
+  }
+
+  /**
+   * Check object existence and then create a {@link PositionOutputStream}
+   *
+   * @return Output stream of object
+   */
+  @Override
+  public PositionOutputStream create() {
+    if (!toInputFile().exists()) {
+      return createOrOverwrite();
+    } else {
+      throw new AlreadyExistsException("The object is existed, %s", uri);
+    }
+  }
+
+  @Override
+  public PositionOutputStream createOrOverwrite() {
+    // use built-in 1 KiB byte buffer
+    return new EcsAppendOutputStream(client, uri, new byte[1_000]);

Review comment:
       And could you make your buffer size a private static static constant for 
readability, thanks!

##########
File path: 
dell/src/main/java/org/apache/iceberg/dell/EcsSeekableInputStream.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.Range;
+import com.emc.object.s3.S3Client;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.iceberg.io.SeekableInputStream;
+
+/**
+ * A {@link SeekableInputStream} impl that warp {@link 
S3Client#readObjectStream(String, String, Range)}
+ * <p>

Review comment:
       list javadoc is done in this way: 
https://stackoverflow.com/questions/6478777/how-to-create-multiple-levels-of-indentation-in-javadoc

##########
File path: 
dell/src/main/java/org/apache/iceberg/dell/EcsSeekableInputStream.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.Range;
+import com.emc.object.s3.S3Client;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.iceberg.io.SeekableInputStream;
+
+/**
+ * A {@link SeekableInputStream} impl that warp {@link 
S3Client#readObjectStream(String, String, Range)}
+ * <p>
+ * 1. The stream is only be loaded when start reading.
+ * <p>
+ * 2. This class won't cache any bytes of content. It only maintains pos of 
{@link SeekableInputStream}
+ * <p>
+ * 3. This class is not thread-safe.
+ */
+class EcsSeekableInputStream extends SeekableInputStream {
+
+  private final S3Client client;
+  private final EcsURI uri;
+
+  /**
+   * Mutable pos set by {@link #seek(long)}
+   */
+  private long newPos = 0;
+  /**
+   * Current pos of object content
+   */
+  private long pos = -1;
+  private InputStream internal;

Review comment:
       nit: name this a a noun, such as `stream` or `delegate`

##########
File path: 
dell/src/main/java/org/apache/iceberg/dell/EcsSeekableInputStream.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.Range;
+import com.emc.object.s3.S3Client;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.iceberg.io.SeekableInputStream;
+
+/**
+ * A {@link SeekableInputStream} impl that warp {@link 
S3Client#readObjectStream(String, String, Range)}
+ * <p>
+ * 1. The stream is only be loaded when start reading.
+ * <p>
+ * 2. This class won't cache any bytes of content. It only maintains pos of 
{@link SeekableInputStream}
+ * <p>
+ * 3. This class is not thread-safe.
+ */
+class EcsSeekableInputStream extends SeekableInputStream {
+
+  private final S3Client client;
+  private final EcsURI uri;
+
+  /**
+   * Mutable pos set by {@link #seek(long)}
+   */
+  private long newPos = 0;
+  /**
+   * Current pos of object content
+   */
+  private long pos = -1;
+  private InputStream internal;

Review comment:
       nit: name this a a noun, such as `stream` or `delegate` or 
`internalStream`

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsURI.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import java.util.Objects;
+
+/**
+ * An immutable record class of ECS location
+ */
+public class EcsURI {
+
+  private final String bucket;
+  private final String name;
+
+  public EcsURI(String bucket, String name) {
+    if (bucket == null || name == null) {
+      throw new IllegalArgumentException(String.format("bucket %s and key %s 
must be not null", bucket, name));

Review comment:
       nit: must not be null. Capitalize Bucket. Also prefer to use 
`Preconditions.checkNotNull` and check these fields individually.

##########
File path: dell/src/main/java/org/apache/iceberg/dell/LocationUtils.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import java.net.URI;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+
+public class LocationUtils {

Review comment:
       I think we can just move these inside the `EcsUri` class? We can just 
make `checkAndParseLocation` a constructor for `EcsUri`

##########
File path: dell/src/main/java/org/apache/iceberg/dell/LocationUtils.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import java.net.URI;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+
+public class LocationUtils {
+
+  private LocationUtils() {
+  }
+
+  private static final Set<String> VALID_SCHEME = ImmutableSet.of("ecs", "s3", 
"s3a", "s3n");
+
+  public static String toString(String bucket, String name) {
+    return String.format("ecs://%s/%s", bucket, name);
+  }
+
+  public static Optional<EcsURI> parseLocation(String location) {

Review comment:
       I did not find any place this is used publicly

##########
File path: 
dell/src/test/java/org/apache/iceberg/dell/EcsAppendOutputStreamTest.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.Range;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.dell.mock.EcsS3MockRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EcsAppendOutputStreamTest {
+
+  @Rule
+  public EcsS3MockRule rule = EcsS3MockRule.create();
+
+  @Test
+  public void generalTest() throws IOException {
+    String objectName = "test";
+    try (EcsAppendOutputStream output = new EcsAppendOutputStream(
+        rule.getClient(),
+        new EcsURI(rule.getBucket(), objectName),
+        new byte[10])) {
+      // write 1 byte
+      output.write('1');
+      // write 3 bytes
+      output.write("123".getBytes());
+      // write 7 bytes, totally 11 bytes > local buffer limit (10)
+      output.write("1234567".getBytes());
+      // write 11 bytes, flush remain 7 bytes and new 11 bytes
+      output.write("12345678901".getBytes());
+    }
+
+    try (InputStream input = 
rule.getClient().readObjectStream(rule.getBucket(), objectName,
+        Range.fromOffset(0))) {
+      assertEquals("object content", "1" + "123" + "1234567" + "12345678901",
+          IOUtils.toString(input, StandardCharsets.US_ASCII));
+    }
+  }
+
+  @Test
+  public void rewrite() throws IOException {
+    String objectName = "test";
+    try (EcsAppendOutputStream output = new EcsAppendOutputStream(
+        rule.getClient(),
+        new EcsURI(rule.getBucket(), objectName),
+        new byte[10])) {
+      // write 7 bytes
+      output.write("7654321".getBytes());
+    }
+
+    try (EcsAppendOutputStream output = new EcsAppendOutputStream(
+        rule.getClient(),
+        new EcsURI(rule.getBucket(), objectName),
+        new byte[10])) {
+      // write 14 bytes
+      output.write("1234567".getBytes());
+      output.write("1234567".getBytes());
+    }
+
+    try (InputStream input = 
rule.getClient().readObjectStream(rule.getBucket(), objectName,
+        Range.fromOffset(0))) {
+      assertEquals("object content", "1234567" + "1234567",
+          IOUtils.toString(input, StandardCharsets.US_ASCII));

Review comment:
       always prefer UTF8 as charset

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsFileIO.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link java.io.Externalizable} FileIO of ECS S3 object client.
+ */
+public class EcsFileIO implements FileIO, Externalizable, AutoCloseable {

Review comment:
       I think we follow the pattern of (1) directly serialize an object like 
`client` as a function, example: 
https://github.com/apache/iceberg/blob/master/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java#L42-L45.
 (2) if absolutely necessary, directly overwrite serialization private methods 
instead of using `Externalize`. Is it possible for you to use the same pattern 
here?

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsFileIO.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link java.io.Externalizable} FileIO of ECS S3 object client.
+ */
+public class EcsFileIO implements FileIO, Externalizable, AutoCloseable {
+
+  private static final Logger log = LoggerFactory.getLogger(EcsFileIO.class);
+
+  /**
+   * Saved properties for {@link java.io.Serializable}
+   */
+  private Map<String, String> properties;
+  private S3Client client;
+  private final AtomicBoolean closed = new AtomicBoolean(true);

Review comment:
       I think we added an atomic check in `S3FileIO` because the client needs 
to be closed only once. But that's the only use, we don't check it for every 
method, and we also initialize it to false and assume it's always by default 
open.

##########
File path: build.gradle
##########
@@ -566,6 +566,23 @@ project(':iceberg-nessie') {
   }
 }
 
+project(':iceberg-dell') {
+  dependencies {
+    implementation project(':iceberg-core')
+    implementation project(path: ':iceberg-bundled-guava', configuration: 
'shadow')
+    implementation 'com.emc.ecs:object-client-bundle'
+
+    testImplementation "org.slf4j:jcl-over-slf4j"

Review comment:
       why is this needed? I don't see logging used in test.

##########
File path: dell/src/test/java/org/apache/iceberg/dell/EcsFileTest.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import java.io.IOException;
+import org.apache.commons.io.IOUtils;

Review comment:
       prefers Guava ByteStreams

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsClientProperties.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+
+/**
+ * Property constants of catalog
+ */
+public interface EcsClientProperties {
+
+  /**
+   * Access key id
+   */
+  String ACCESS_KEY_ID = "ecs.s3.access.key.id";

Review comment:
       just want to point out, in AWS module we decided to not exposing these 
properties related to credentials, because this forces users to hard-code their 
secrets in plain text and is not a good security practice. Also this forces 
users to use a long-term credential instead of a session credential that will 
expire and has lower security burden. That is why we had the client factory 
class so that by default we load credentials using the default credentials 
chain, and users customize the client initialization process directly if they 
want. I don't know enough about ECS service use pattern, maybe this is a common 
use pattern on your side, but just keep in mind about these potential security 
risks.

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsFileIO.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link java.io.Externalizable} FileIO of ECS S3 object client.
+ */
+public class EcsFileIO implements FileIO, Externalizable, AutoCloseable {
+
+  private static final Logger log = LoggerFactory.getLogger(EcsFileIO.class);
+
+  /**
+   * Saved properties for {@link java.io.Serializable}
+   */
+  private Map<String, String> properties;
+  private S3Client client;
+  private final AtomicBoolean closed = new AtomicBoolean(true);
+
+  /**
+   * Blank constructor
+   */
+  public EcsFileIO() {
+  }
+
+  @Override
+  public void initialize(Map<String, String> inputProperties) {
+    if (closed.compareAndSet(true, false)) {
+      this.properties = Collections.unmodifiableMap(new 
LinkedHashMap<>(inputProperties));

Review comment:
       prefer to directly use `ImmutableMap`

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsFileIO.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link java.io.Externalizable} FileIO of ECS S3 object client.
+ */
+public class EcsFileIO implements FileIO, Externalizable, AutoCloseable {
+
+  private static final Logger log = LoggerFactory.getLogger(EcsFileIO.class);
+
+  /**
+   * Saved properties for {@link java.io.Serializable}
+   */
+  private Map<String, String> properties;
+  private S3Client client;
+  private final AtomicBoolean closed = new AtomicBoolean(true);

Review comment:
       I think typically we don't worry about re-initializing the class and 
just assume it is only initialized once. Is there any particular use case for 
having this atomic check for every operation?

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsOutputFile.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class EcsOutputFile implements OutputFile {
+
+  private final S3Client client;
+  private final String location;
+  private final EcsURI uri;
+
+  public EcsOutputFile(S3Client client, String location) {
+    this.client = client;
+    this.location = location;
+    this.uri = LocationUtils.checkAndParseLocation(location);
+  }
+
+  /**
+   * Check object existence and then create a {@link PositionOutputStream}
+   *
+   * @return Output stream of object
+   */
+  @Override
+  public PositionOutputStream create() {
+    if (!toInputFile().exists()) {
+      return createOrOverwrite();
+    } else {
+      throw new AlreadyExistsException("The object is existed, %s", uri);

Review comment:
       nit: "ECS object already exists: %s"

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsOutputFile.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class EcsOutputFile implements OutputFile {
+
+  private final S3Client client;
+  private final String location;
+  private final EcsURI uri;
+
+  public EcsOutputFile(S3Client client, String location) {
+    this.client = client;
+    this.location = location;
+    this.uri = LocationUtils.checkAndParseLocation(location);
+  }
+
+  /**
+   * Check object existence and then create a {@link PositionOutputStream}
+   *
+   * @return Output stream of object
+   */
+  @Override
+  public PositionOutputStream create() {
+    if (!toInputFile().exists()) {
+      return createOrOverwrite();
+    } else {
+      throw new AlreadyExistsException("The object is existed, %s", uri);
+    }
+  }
+
+  @Override
+  public PositionOutputStream createOrOverwrite() {
+    // use built-in 1 KiB byte buffer
+    return new EcsAppendOutputStream(client, uri, new byte[1_000]);

Review comment:
       why not directly use `ByteBuffer.allocate`? Then you don't need to 
convert it back to byte buffer in the constructor.  Also would it be more 
beneficial to allocate 1024 vs 1000?

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsOutputFile.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class EcsOutputFile implements OutputFile {
+
+  private final S3Client client;
+  private final String location;
+  private final EcsURI uri;
+
+  public EcsOutputFile(S3Client client, String location) {
+    this.client = client;
+    this.location = location;
+    this.uri = LocationUtils.checkAndParseLocation(location);
+  }
+
+  /**
+   * Check object existence and then create a {@link PositionOutputStream}
+   *
+   * @return Output stream of object
+   */
+  @Override
+  public PositionOutputStream create() {
+    if (!toInputFile().exists()) {
+      return createOrOverwrite();
+    } else {
+      throw new AlreadyExistsException("The object is existed, %s", uri);
+    }
+  }
+
+  @Override
+  public PositionOutputStream createOrOverwrite() {
+    // use built-in 1 KiB byte buffer
+    return new EcsAppendOutputStream(client, uri, new byte[1_000]);

Review comment:
       And I think we prefer to pass ByteBuffer as argument instead of direct 
byte array. So could you at least move the `ByteBuffer.wrap` to this level, or 
the constructor don't really need to have this argument since you always create 
in a new byte buffer of fixed length, so you can directly create it in the 
constructor instead.

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsOutputFile.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class EcsOutputFile implements OutputFile {
+
+  private final S3Client client;
+  private final String location;
+  private final EcsURI uri;
+
+  public EcsOutputFile(S3Client client, String location) {
+    this.client = client;
+    this.location = location;
+    this.uri = LocationUtils.checkAndParseLocation(location);
+  }
+
+  /**
+   * Check object existence and then create a {@link PositionOutputStream}
+   *
+   * @return Output stream of object
+   */
+  @Override
+  public PositionOutputStream create() {
+    if (!toInputFile().exists()) {
+      return createOrOverwrite();
+    } else {
+      throw new AlreadyExistsException("The object is existed, %s", uri);
+    }
+  }
+
+  @Override
+  public PositionOutputStream createOrOverwrite() {
+    // use built-in 1 KiB byte buffer
+    return new EcsAppendOutputStream(client, uri, new byte[1_000]);

Review comment:
       And could you make your buffer size a private static constant for 
readability, thanks!

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsOutputFile.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.io.PositionOutputStream;
+
+public class EcsOutputFile implements OutputFile {
+
+  private final S3Client client;
+  private final String location;
+  private final EcsURI uri;
+
+  public EcsOutputFile(S3Client client, String location) {
+    this.client = client;
+    this.location = location;
+    this.uri = LocationUtils.checkAndParseLocation(location);
+  }
+
+  /**
+   * Check object existence and then create a {@link PositionOutputStream}
+   *
+   * @return Output stream of object
+   */
+  @Override
+  public PositionOutputStream create() {
+    if (!toInputFile().exists()) {
+      return createOrOverwrite();
+    } else {
+      throw new AlreadyExistsException("The object is existed, %s", uri);
+    }
+  }
+
+  @Override
+  public PositionOutputStream createOrOverwrite() {
+    // use built-in 1 KiB byte buffer
+    return new EcsAppendOutputStream(client, uri, new byte[1_000]);

Review comment:
       And could you make your buffer size a private static static constant for 
readability, thanks!

##########
File path: 
dell/src/main/java/org/apache/iceberg/dell/EcsSeekableInputStream.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.Range;
+import com.emc.object.s3.S3Client;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.iceberg.io.SeekableInputStream;
+
+/**
+ * A {@link SeekableInputStream} impl that warp {@link 
S3Client#readObjectStream(String, String, Range)}
+ * <p>

Review comment:
       list javadoc is done in this way: 
https://stackoverflow.com/questions/6478777/how-to-create-multiple-levels-of-indentation-in-javadoc

##########
File path: 
dell/src/main/java/org/apache/iceberg/dell/EcsSeekableInputStream.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.Range;
+import com.emc.object.s3.S3Client;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.iceberg.io.SeekableInputStream;
+
+/**
+ * A {@link SeekableInputStream} impl that warp {@link 
S3Client#readObjectStream(String, String, Range)}
+ * <p>
+ * 1. The stream is only be loaded when start reading.
+ * <p>
+ * 2. This class won't cache any bytes of content. It only maintains pos of 
{@link SeekableInputStream}
+ * <p>
+ * 3. This class is not thread-safe.
+ */
+class EcsSeekableInputStream extends SeekableInputStream {
+
+  private final S3Client client;
+  private final EcsURI uri;
+
+  /**
+   * Mutable pos set by {@link #seek(long)}
+   */
+  private long newPos = 0;
+  /**
+   * Current pos of object content
+   */
+  private long pos = -1;
+  private InputStream internal;

Review comment:
       nit: name this a a noun, such as `stream` or `delegate`

##########
File path: 
dell/src/main/java/org/apache/iceberg/dell/EcsSeekableInputStream.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.Range;
+import com.emc.object.s3.S3Client;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.iceberg.io.SeekableInputStream;
+
+/**
+ * A {@link SeekableInputStream} impl that warp {@link 
S3Client#readObjectStream(String, String, Range)}
+ * <p>
+ * 1. The stream is only be loaded when start reading.
+ * <p>
+ * 2. This class won't cache any bytes of content. It only maintains pos of 
{@link SeekableInputStream}
+ * <p>
+ * 3. This class is not thread-safe.
+ */
+class EcsSeekableInputStream extends SeekableInputStream {
+
+  private final S3Client client;
+  private final EcsURI uri;
+
+  /**
+   * Mutable pos set by {@link #seek(long)}
+   */
+  private long newPos = 0;
+  /**
+   * Current pos of object content
+   */
+  private long pos = -1;
+  private InputStream internal;

Review comment:
       nit: name this a a noun, such as `stream` or `delegate` or 
`internalStream`

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsURI.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import java.util.Objects;
+
+/**
+ * An immutable record class of ECS location
+ */
+public class EcsURI {
+
+  private final String bucket;
+  private final String name;
+
+  public EcsURI(String bucket, String name) {
+    if (bucket == null || name == null) {
+      throw new IllegalArgumentException(String.format("bucket %s and key %s 
must be not null", bucket, name));

Review comment:
       nit: must not be null. Capitalize Bucket. Also prefer to use 
`Preconditions.checkNotNull` and check these fields individually.

##########
File path: dell/src/main/java/org/apache/iceberg/dell/LocationUtils.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import java.net.URI;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+
+public class LocationUtils {

Review comment:
       I think we can just move these inside the `EcsUri` class? We can just 
make `checkAndParseLocation` a constructor for `EcsUri`

##########
File path: dell/src/main/java/org/apache/iceberg/dell/LocationUtils.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import java.net.URI;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+
+public class LocationUtils {
+
+  private LocationUtils() {
+  }
+
+  private static final Set<String> VALID_SCHEME = ImmutableSet.of("ecs", "s3", 
"s3a", "s3n");
+
+  public static String toString(String bucket, String name) {
+    return String.format("ecs://%s/%s", bucket, name);
+  }
+
+  public static Optional<EcsURI> parseLocation(String location) {

Review comment:
       I did not find any place this is used publicly

##########
File path: 
dell/src/test/java/org/apache/iceberg/dell/EcsAppendOutputStreamTest.java
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.Range;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.iceberg.dell.mock.EcsS3MockRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EcsAppendOutputStreamTest {
+
+  @Rule
+  public EcsS3MockRule rule = EcsS3MockRule.create();
+
+  @Test
+  public void generalTest() throws IOException {
+    String objectName = "test";
+    try (EcsAppendOutputStream output = new EcsAppendOutputStream(
+        rule.getClient(),
+        new EcsURI(rule.getBucket(), objectName),
+        new byte[10])) {
+      // write 1 byte
+      output.write('1');
+      // write 3 bytes
+      output.write("123".getBytes());
+      // write 7 bytes, totally 11 bytes > local buffer limit (10)
+      output.write("1234567".getBytes());
+      // write 11 bytes, flush remain 7 bytes and new 11 bytes
+      output.write("12345678901".getBytes());
+    }
+
+    try (InputStream input = 
rule.getClient().readObjectStream(rule.getBucket(), objectName,
+        Range.fromOffset(0))) {
+      assertEquals("object content", "1" + "123" + "1234567" + "12345678901",
+          IOUtils.toString(input, StandardCharsets.US_ASCII));
+    }
+  }
+
+  @Test
+  public void rewrite() throws IOException {
+    String objectName = "test";
+    try (EcsAppendOutputStream output = new EcsAppendOutputStream(
+        rule.getClient(),
+        new EcsURI(rule.getBucket(), objectName),
+        new byte[10])) {
+      // write 7 bytes
+      output.write("7654321".getBytes());
+    }
+
+    try (EcsAppendOutputStream output = new EcsAppendOutputStream(
+        rule.getClient(),
+        new EcsURI(rule.getBucket(), objectName),
+        new byte[10])) {
+      // write 14 bytes
+      output.write("1234567".getBytes());
+      output.write("1234567".getBytes());
+    }
+
+    try (InputStream input = 
rule.getClient().readObjectStream(rule.getBucket(), objectName,
+        Range.fromOffset(0))) {
+      assertEquals("object content", "1234567" + "1234567",
+          IOUtils.toString(input, StandardCharsets.US_ASCII));

Review comment:
       always prefer UTF8 as charset

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsFileIO.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link java.io.Externalizable} FileIO of ECS S3 object client.
+ */
+public class EcsFileIO implements FileIO, Externalizable, AutoCloseable {

Review comment:
       I think we follow the pattern of (1) directly serialize an object like 
`client` as a function, example: 
https://github.com/apache/iceberg/blob/master/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java#L42-L45.
 (2) if absolutely necessary, directly overwrite serialization private methods 
instead of using `Externalize`. Is it possible for you to use the same pattern 
here?

##########
File path: dell/src/main/java/org/apache/iceberg/dell/EcsFileIO.java
##########
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import com.emc.object.s3.S3Client;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link java.io.Externalizable} FileIO of ECS S3 object client.
+ */
+public class EcsFileIO implements FileIO, Externalizable, AutoCloseable {
+
+  private static final Logger log = LoggerFactory.getLogger(EcsFileIO.class);
+
+  /**
+   * Saved properties for {@link java.io.Serializable}
+   */
+  private Map<String, String> properties;
+  private S3Client client;
+  private final AtomicBoolean closed = new AtomicBoolean(true);

Review comment:
       I think we added an atomic check in `S3FileIO` because the client needs 
to be closed only once. But that's the only use, we don't check it for every 
method, and we also initialize it to false and assume it's always by default 
open.

##########
File path: build.gradle
##########
@@ -566,6 +566,23 @@ project(':iceberg-nessie') {
   }
 }
 
+project(':iceberg-dell') {
+  dependencies {
+    implementation project(':iceberg-core')
+    implementation project(path: ':iceberg-bundled-guava', configuration: 
'shadow')
+    implementation 'com.emc.ecs:object-client-bundle'
+
+    testImplementation "org.slf4j:jcl-over-slf4j"

Review comment:
       why is this needed? I don't see logging used in test.

##########
File path: dell/src/test/java/org/apache/iceberg/dell/EcsFileTest.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.dell;
+
+import java.io.IOException;
+import org.apache.commons.io.IOUtils;

Review comment:
       prefers Guava ByteStreams




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to