rdblue commented on a change in pull request #3424: URL: https://github.com/apache/iceberg/pull/3424#discussion_r740328866
########## File path: api/src/main/java/org/apache/iceberg/rest/CreateNamespaceRequest.java ########## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import java.io.Serializable; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +/** + * Represents a REST request to create a namespace / database. + */ +public class CreateNamespaceRequest implements Serializable { + + // TODO - Use protected so users can extend this for their own impls. Or an interface. + // Currently anything but private causes an error. + private String namespaceName; + private Map<String, String> properties; + + private CreateNamespaceRequest() { + + } + + private CreateNamespaceRequest(String namespaceName, Map<String, String> properties) { + this.namespaceName = namespaceName; + this.properties = properties; + } + + /** + * Name of the database to create. + */ + String getNamespaceName() { + return namespaceName; + } + + void setNamespaceName(String name) { + this.namespaceName = name; Review comment: I think this depends on how we want to specify the protocol. I prefer documenting the JSON objects that will be sent, which probably means going through and creating serializers/deserializers for each object. ########## File path: api/src/main/java/org/apache/iceberg/rest/CreateNamespaceResponse.java ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import java.util.Map; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * Represents a REST response to a create a namespace / database request. + * + * + */ +public class CreateNamespaceResponse { + private Namespace namespace; + private Map<String, String> properties; Review comment: Yeah, I meant #2. All properties of the namespace, including the ones the user sent and the ones that were added automatically. We should document this in the class Javadoc. ########## File path: core/src/main/java/org/apache/iceberg/rest/RestTableOperations.java ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.rest.http.HttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO - Extract out to an interface - Implement with HTTP version. +// TODO - Provide Builder interface - Implement with HTTP version. +// TODO - Should we implement Configurable here? Since this will be an interface, I think not in the interface. +// TODO - As this will be more of an interface, possibly extend TableOperations directly (like HadoopTableOperations) +class RestTableOperations extends BaseMetastoreTableOperations implements Closeable, SupportsNamespaces, Configurable { Review comment: `TableOperations` shouldn't implement `SupportsNamespaces`. That's a mix-in for `Catalog`. ########## File path: core/src/main/java/org/apache/iceberg/rest/RestTableOperations.java ########## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseMetastoreTableOperations; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.rest.http.HttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// TODO - Extract out to an interface - Implement with HTTP version. +// TODO - Provide Builder interface - Implement with HTTP version. +// TODO - Should we implement Configurable here? Since this will be an interface, I think not in the interface. +// TODO - As this will be more of an interface, possibly extend TableOperations directly (like HadoopTableOperations) +class RestTableOperations extends BaseMetastoreTableOperations implements Closeable, SupportsNamespaces, Configurable { Review comment: `TableOperations` shouldn't implement `SupportsNamespaces`. That's a mix-in for `Catalog`. Also, if `FileIO` is passed in, I don't think that we need to make this `Configurable`. There's no need to have a `hadoopConf` other than to build a `FileIO`. ########## File path: core/src/main/java/org/apache/iceberg/rest/http/ErrorHandlers.java ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.rest.http; + +import java.util.function.Consumer; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.AuthorizationDeniedException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.rest.RestException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ErrorHandlers { + + private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreCatalog.class); + + private ErrorHandlers() { + + } + + /** + * Table level error handlers. + */ + public static Consumer<CloseableHttpResponse> tableErrorHandler() { + return (errorResponse) -> { + String responseBody = getResponseBody(errorResponse); + String responseException = getIcebergExceptionHeader(errorResponse); + + switch (errorResponse.getCode()) { + case HttpStatus.SC_NOT_FOUND: + // TODO: Exception handling here could be better + // some methods can result in different resource not found exceptions, so here we need to + // differentiate between 404 for non-existent Namespace/Database by looking at X-Iceberg-Exception header. + if (NoSuchNamespaceException.class.getSimpleName().equals(responseException)) { + throw new NoSuchNamespaceException("Resource not found: %s", responseBody); + } else { + throw new NoSuchTableException("Resource not found: %s", responseBody); + } + case HttpStatus.SC_CONFLICT: + throw new AlreadyExistsException("Already exists: %s", responseBody); + case HttpStatus.SC_FORBIDDEN: + case HttpStatus.SC_UNAUTHORIZED: + throw new AuthorizationDeniedException("Not Authorized: %s", responseBody); + default: + throw new RestException("Unknown error: %s", errorResponse); + } + }; + } + + /** + * Request error handlers specifically for CRUD ops on databases / namespaces. + */ + public static Consumer<CloseableHttpResponse> databaseErrorHandler() { + return (errorResponse) -> { + String responseBody = getResponseBody(errorResponse); + + switch (errorResponse.getCode()) { + case HttpStatus.SC_NOT_FOUND: + throw new NoSuchNamespaceException("Namespace not found: %s", responseBody); + case HttpStatus.SC_CONFLICT: + throw new AlreadyExistsException("Already exists: %s", responseBody); + case HttpStatus.SC_FORBIDDEN: + case HttpStatus.SC_UNAUTHORIZED: + throw new AuthorizationDeniedException("Not Authorized: %s", responseBody); + default: + throw new RestException("Unknown error: %s", errorResponse); + } + }; + } + + static String getResponseBody(CloseableHttpResponse response) { + String responseBody = "Non-parseable Response Body"; + try { + responseBody = EntityUtils.toString(response.getEntity()); + } catch (Exception e) { + LOG.error("Encountered an exception getting response body", e); + } + + return responseBody; + } + + static String getIcebergExceptionHeader(CloseableHttpResponse response) { + String icebergException = null; + try { + // TODO - Extract more specific exception from a header or from the response body? + // Some servers/proxies will strip headers that aren't in the whitelist so possibly headers + // aren't the way to go (though that's typically more on the Request end than the Response end). + icebergException = response.getHeader("X-Iceberg-Exception").getValue(); + } catch (Exception e) { + // TODO - Better error message and handling. Will be refactoring this anyway. + LOG.error("Encountered an error when getting the X-Iceberg-Exception header", e); Review comment: +1. I'm not sure that we need error codes, but we can always add them if people find them useful. ########## File path: core/src/main/java/org/apache/iceberg/rest/http/HttpClient.java ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest.http; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.util.Map; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import org.apache.hc.client5.http.classic.methods.HttpUriRequest; +import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase; +import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; +import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; +import org.apache.hc.client5.http.impl.classic.HttpClients; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.Method; +import org.apache.hc.core5.http.io.entity.StringEntity; +import org.apache.hc.core5.io.CloseMode; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.rest.RestClient; +import org.apache.iceberg.rest.UncheckedRestException; + +/** + * An HttpClient that can be used for the RestClient (eventually). + * TODO - Extract out top for all of this so that other implementations are easy to implement (e.g. gRPC). + */ +public class HttpClient implements RestClient { + private final String baseUrl; + + // Need their own Builders? + private final CloseableHttpClient httpClient; + private final Consumer<CloseableHttpResponse> defaultErrorHandler; + private final ObjectMapper mapper; + private Map<String, String> additionalHeaders = Maps.newHashMap(); + private Consumer<HttpUriRequest> requestInterceptor = (r) -> { }; + + private HttpClient( + String baseUrl, CloseableHttpClient httpClient, ObjectMapper mapper, + Map<String, String> additionalHeaders, Consumer<HttpUriRequest> requestIntercepter, + Consumer<CloseableHttpResponse> defaultErrorHandler) { + this.baseUrl = baseUrl; + this.httpClient = httpClient != null ? httpClient : HttpClients.createDefault(); + this.mapper = mapper != null ? mapper : new ObjectMapper(); + this.additionalHeaders = additionalHeaders != null ? additionalHeaders : Maps.newHashMap(); + // TODO - Best way to handle Preconditions here. + this.requestInterceptor = requestIntercepter; + this.defaultErrorHandler = defaultErrorHandler; + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Method to execute an HTTP request and process the corresponding response. + * + * @param method - HTTP method, such as GET, POST, HEAD, etc. + * @param path - URL path to send the request to + * @param body - Contents of the request body. + * @param responseType - Class of the Response type. Needs to have serializer registered with ObjectMapper + * @param errorHandler - Error handler delegated for HTTP responses which handles server error responses + * @param <T> - Class type of the response for deserialization. Must be registered with the ObjectMapper. + * @return The Response enttity, parsed and converted to its type T + * @throws UncheckedIOException - Shouldn't throw this as the requestInterceptor should handle expected cases. + * @throws UncheckedRestException - Wraps exceptions to avoid having to use checked exceptions everywhere. + */ + @Nullable + public <T> T execute( + Method method, String path, Object body, Class<T> responseType, + Consumer<CloseableHttpResponse> errorHandler) { + HttpUriRequestBase request = new HttpUriRequestBase(method.name(), URI.create(String.format("%s/%s", baseUrl, + path))); + addRequestHeaders(request); + + if (body != null) { + try { + request.setEntity(new StringEntity(mapper.writeValueAsString(body))); + } catch (JsonProcessingException e) { + throw new UncheckedRestException(e, "Failed to write request body"); + } + } + + requestInterceptor.accept(request); + + try (CloseableHttpResponse response = httpClient.execute(request)) { + if (response.getCode() != HttpStatus.SC_OK) { + errorHandler.accept(response); Review comment: We can use whatever we want as long as the license is compatible. That said, if the library is already included with Spark and Flink, we generally prefer to use it so we don't need to shade and relocate it. That's why Apache HTTP is most often used. ########## File path: core/src/main/java/org/apache/iceberg/rest/http/RequestResponseSerializers.java ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest.http; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; +import java.io.IOException; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.SortOrderParser; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; + + +public class RequestResponseSerializers { + + private RequestResponseSerializers() { + } + + public static void registerAll(ObjectMapper mapper) { + SimpleModule module = new SimpleModule(); + module + .addSerializer(TableIdentifier.class, new RequestResponseSerializers.TableIdentifierSerializer()) + .addDeserializer(TableIdentifier.class, new RequestResponseSerializers.TableIdentifierDeserializer()) + .addSerializer(Namespace.class, new RequestResponseSerializers.NamespaceSerializer()) + .addDeserializer(Namespace.class, new RequestResponseSerializers.NamespaceDeserializer()) + .addSerializer(Schema.class, new RequestResponseSerializers.SchemaSerializer()) + .addDeserializer(Schema.class, new RequestResponseSerializers.SchemaDeserializer()) + .addSerializer(PartitionSpec.class, new RequestResponseSerializers.PartitionSpecSerializer()) + .addDeserializer(PartitionSpec.class, new RequestResponseSerializers.PartitionSpecDeserializer()) + .addSerializer(SortOrder.class, new RequestResponseSerializers.SortOrderSerializer()) + .addDeserializer(SortOrder.class, new RequestResponseSerializers.SortOrderDeserializer()) + .addSerializer(TableMetadata.class, new RequestResponseSerializers.TableMetadataSerializer()); + mapper.registerModule(module); + } + + public static class NamespaceDeserializer extends JsonDeserializer<Namespace> { + @Override + public Namespace deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return Namespace.of(jsonNode.asText().split("\\.")); + } + } + + public static class NamespaceSerializer extends JsonSerializer<Namespace> { + @Override + public void serialize(Namespace namespace, JsonGenerator gen, SerializerProvider serializers) throws IOException { + gen.writeString(namespace.toString()); + } + } + + public static class TableIdentifierDeserializer extends JsonDeserializer<TableIdentifier> { + @Override + public TableIdentifier deserialize(JsonParser p, DeserializationContext ctxt) throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + return TableIdentifier.parse(jsonNode.asText()); + } + } + + public static class TableIdentifierSerializer extends JsonSerializer<TableIdentifier> { + @Override + public void serialize(TableIdentifier identifier, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + gen.writeString(identifier.toString()); + } + } + + public static class SchemaDeserializer extends JsonDeserializer<Schema> { + @Override + public Schema deserialize(JsonParser p, DeserializationContext context) + throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + Schema schema = SchemaParser.fromJson(jsonNode); + context.setAttribute("schema", schema); + return schema; + } + } + + public static class SchemaSerializer extends JsonSerializer<Schema> { + @Override + public void serialize(Schema schema, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + SchemaParser.toJson(schema, gen); + } + } + + public static class PartitionSpecSerializer extends JsonSerializer<PartitionSpec> { + @Override + public void serialize(PartitionSpec partitionSpec, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + PartitionSpecParser.toJson(partitionSpec, gen); + } + } + + public static class PartitionSpecDeserializer extends JsonDeserializer<PartitionSpec> { + @Override + public PartitionSpec deserialize(JsonParser p, DeserializationContext context) + throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + + Schema schema = (Schema) context.getAttribute("schema"); + + return PartitionSpecParser.fromJson(schema, jsonNode); + } + } + + public static class SortOrderSerializer extends JsonSerializer<SortOrder> { + @Override + public void serialize(SortOrder sortOrder, JsonGenerator gen, SerializerProvider serializers) + throws IOException { + SortOrderParser.toJson(sortOrder, gen); + } + } + + public static class SortOrderDeserializer extends JsonDeserializer<SortOrder> { + @Override + public SortOrder deserialize(JsonParser p, DeserializationContext context) + throws IOException { + JsonNode jsonNode = p.getCodec().readTree(p); + Schema schema = (Schema) context.getAttribute("schema"); + + return SortOrderParser.fromJson(schema, jsonNode); + } + } + + public static class TableMetadataSerializer extends JsonSerializer<TableMetadata> { Review comment: Yeah, there's no need to only read from a metadata file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
