amogh-jahagirdar commented on code in PR #8091:
URL: https://github.com/apache/iceberg/pull/8091#discussion_r1267333528


##########
core/src/main/java/org/apache/iceberg/rest/BaseRESTClient.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.rest;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.function.Consumer;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.Method;
+import org.apache.hc.core5.http.impl.EnglishReasonPhraseCatalog;
+import org.apache.hc.core5.net.URIBuilder;
+import org.apache.iceberg.exceptions.RESTException;
+import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** An HttpClient for usage with the REST catalog. */
+public abstract class BaseRESTClient implements RESTClient {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseRESTClient.class);
+
+  private String uri;
+  private ObjectMapper mapper;
+
+  protected BaseRESTClient() {}
+
+  @Override
+  public void initialize(
+      String baseUri,
+      ObjectMapper objectMapper,
+      Map<String, String> baseHeaders,
+      Map<String, String> properties) {
+    this.uri = baseUri;
+    this.mapper = objectMapper;
+  }
+
+  /**
+   * Method to execute an HTTP request and process the corresponding response.
+   *
+   * @param method - HTTP method, such as GET, POST, HEAD, etc.
+   * @param queryParams - A map of query parameters
+   * @param path - URL path to send the request to
+   * @param requestBody - Content to place in the request body
+   * @param responseType - Class of the Response type. Needs to have 
serializer registered with
+   *     ObjectMapper
+   * @param errorHandler - Error handler delegated for HTTP responses which 
handles server error
+   *     responses
+   * @param <T> - Class type of the response for deserialization. Must be 
registered with the
+   *     ObjectMapper.
+   * @return The response entity, parsed and converted to its type T
+   */
+  private <T> T execute(
+      Method method,
+      String path,
+      Map<String, String> queryParams,
+      Object requestBody,
+      Class<T> responseType,
+      Map<String, String> headers,
+      Consumer<ErrorResponse> errorHandler) {
+    return execute(
+        method, path, queryParams, requestBody, responseType, headers, 
errorHandler, h -> {});
+  }
+
+  /**
+   * Method to execute an HTTP request and process the corresponding response.
+   *
+   * @param method - HTTP method, such as GET, POST, HEAD, etc.
+   * @param queryParams - A map of query parameters
+   * @param path - URL path to send the request to
+   * @param requestBody - Content to place in the request body
+   * @param responseType - Class of the Response type. Needs to have 
serializer registered with
+   *     ObjectMapper
+   * @param errorHandler - Error handler delegated for HTTP responses which 
handles server error
+   *     responses
+   * @param responseHeaders The consumer of the response headers
+   * @param <T> - Class type of the response for deserialization. Must be 
registered with the
+   *     ObjectMapper.
+   * @return The response entity, parsed and converted to its type T
+   */
+  protected abstract <T> T execute(
+      Method method,
+      String path,
+      Map<String, String> queryParams,
+      Object requestBody,
+      Class<T> responseType,
+      Map<String, String> headers,
+      Consumer<ErrorResponse> errorHandler,
+      Consumer<Map<String, String>> responseHeaders);
+
+  @Override
+  public void head(String path, Map<String, String> headers, 
Consumer<ErrorResponse> errorHandler) {
+    execute(Method.HEAD, path, null, null, null, headers, errorHandler);
+  }
+
+  @Override
+  public <T extends RESTResponse> T get(
+      String path,
+      Map<String, String> queryParams,
+      Class<T> responseType,
+      Map<String, String> headers,
+      Consumer<ErrorResponse> errorHandler) {
+    return execute(Method.GET, path, queryParams, null, responseType, headers, 
errorHandler);
+  }
+
+  @Override
+  public <T extends RESTResponse> T post(
+      String path,
+      RESTRequest body,
+      Class<T> responseType,
+      Map<String, String> headers,
+      Consumer<ErrorResponse> errorHandler) {
+    return execute(Method.POST, path, null, body, responseType, headers, 
errorHandler);
+  }
+
+  @Override
+  public <T extends RESTResponse> T post(
+      String path,
+      RESTRequest body,
+      Class<T> responseType,
+      Map<String, String> headers,
+      Consumer<ErrorResponse> errorHandler,
+      Consumer<Map<String, String>> responseHeaders) {
+    return execute(
+        Method.POST, path, null, body, responseType, headers, errorHandler, 
responseHeaders);
+  }
+
+  @Override
+  public <T extends RESTResponse> T delete(
+      String path,
+      Class<T> responseType,
+      Map<String, String> headers,
+      Consumer<ErrorResponse> errorHandler) {
+    return execute(Method.DELETE, path, null, null, responseType, headers, 
errorHandler);
+  }
+
+  @Override
+  public <T extends RESTResponse> T delete(
+      String path,
+      Map<String, String> queryParams,
+      Class<T> responseType,
+      Map<String, String> headers,
+      Consumer<ErrorResponse> errorHandler) {
+    return execute(Method.DELETE, path, queryParams, null, responseType, 
headers, errorHandler);
+  }
+
+  @Override
+  public <T extends RESTResponse> T postForm(
+      String path,
+      Map<String, String> formData,
+      Class<T> responseType,
+      Map<String, String> headers,
+      Consumer<ErrorResponse> errorHandler) {
+    return execute(Method.POST, path, null, formData, responseType, headers, 
errorHandler);
+  }
+
+  // Per the spec, the only currently defined / used "success" responses are 
200 and 202.
+  protected static boolean isSuccessful(int code) {
+    return code == HttpStatus.SC_OK
+        || code == HttpStatus.SC_ACCEPTED
+        || code == HttpStatus.SC_NO_CONTENT;
+  }
+
+  protected static ErrorResponse buildDefaultErrorResponse(int code, String 
responseReason) {
+    String message =
+        responseReason != null && !responseReason.isEmpty()
+            ? responseReason
+            : EnglishReasonPhraseCatalog.INSTANCE.getReason(code, null /* 
ignored */);
+    String type = "RESTException";
+    return 
ErrorResponse.builder().responseCode(code).withMessage(message).withType(type).build();
+  }
+
+  // Process a failed response through the provided errorHandler, and throw a 
RESTException if the
+  // provided error handler doesn't already throw.
+  protected static void throwFailure(
+      int code, String responseBody, String reason, Consumer<ErrorResponse> 
errorHandler) {
+    ErrorResponse errorResponse = null;
+
+    if (responseBody != null) {
+      try {
+        if (errorHandler instanceof ErrorHandler) {
+          errorResponse = ((ErrorHandler) errorHandler).parseResponse(code, 
responseBody);
+        } else {
+          LOG.warn(
+              "Unknown error handler {}, response body won't be parsed",
+              errorHandler.getClass().getName());
+          errorResponse =
+              
ErrorResponse.builder().responseCode(code).withMessage(responseBody).build();
+        }
+
+      } catch (UncheckedIOException | IllegalArgumentException e) {
+        // It's possible to receive a non-successful response that isn't a 
properly defined
+        // ErrorResponse
+        // without any bugs in the server implementation. So we ignore this 
exception and build an
+        // error
+        // response for the user.
+        //
+        // For example, the connection could time out before every reaching 
the server, in which
+        // case we'll
+        // likely get a 5xx with the load balancers default 5xx response.

Review Comment:
   NIt: Can you double check the formatting in the comments so that there's no 
awkward 1 word lines?



##########
core/src/main/java/org/apache/iceberg/rest/RESTObjectMapper.java:
##########
@@ -25,14 +25,14 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.PropertyNamingStrategy;
 
-class RESTObjectMapper {
+public class RESTObjectMapper {

Review Comment:
   If I'm not mistaken what's ultimately driving this is us injecting the 
`Mapper` into the RestClient. If we don't do that and instead keep that just 
static within the parent `RestClient` implementation I think we won't even need 
to expose this class. Is there a reason we want to pass in the 
`RestObjectMapper` to the client? (any reason to extend it/customize it?) 



##########
aws/src/test/java/org/apache/iceberg/aws/lambda/rest/TestLambdaRESTInvoker.java:
##########
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.lambda.rest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.iceberg.IcebergBuild;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.rest.ErrorHandler;
+import org.apache.iceberg.rest.HttpMethod;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.RESTClientProperties;
+import org.apache.iceberg.rest.RESTObjectMapper;
+import org.apache.iceberg.rest.RESTRequest;
+import org.apache.iceberg.rest.RESTResponse;
+import org.apache.iceberg.rest.responses.ErrorResponse;
+import org.apache.iceberg.rest.responses.ErrorResponseParser;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.lambda.LambdaClient;
+import software.amazon.awssdk.services.lambda.model.InvokeRequest;
+import software.amazon.awssdk.services.lambda.model.InvokeResponse;
+
+public class TestLambdaRESTInvoker {
+
+  private static final int PORT = 1080;
+  private static final String BEARER_AUTH_TOKEN = "auth_token";
+  private static final String URI = String.format("http://127.0.0.1:%d";, PORT);
+  private static final String FUNCTION_ARN = "some-arn";
+  private static final ObjectMapper MAPPER = RESTObjectMapper.mapper();
+
+  private static Map<String, String> baseHeaders;
+  private static LambdaClient lambda;
+  private static RESTClient restClient;
+
+  @BeforeAll
+  public static void beforeClass() {
+    lambda = mock(LambdaClient.class);
+    String icebergBuildGitCommitShort = IcebergBuild.gitCommitShortId();
+    String icebergBuildFullVersion = IcebergBuild.fullVersion();
+    baseHeaders =
+        ImmutableMap.of(
+            "Authorization",
+            "Bearer " + BEARER_AUTH_TOKEN,
+            RESTClientProperties.CLIENT_VERSION_HEADER,
+            icebergBuildFullVersion,
+            RESTClientProperties.CLIENT_GIT_COMMIT_SHORT_HEADER,
+            icebergBuildGitCommitShort);
+    restClient = new LambdaRESTInvoker(URI, MAPPER, baseHeaders, lambda, 
FUNCTION_ARN);
+  }
+
+  @Test
+  public void testPostSuccess() throws Exception {
+    testHttpMethodOnSuccess(HttpMethod.POST);
+  }
+
+  @Test
+  public void testPostFailure() throws Exception {
+    testHttpMethodOnFailure(HttpMethod.POST);
+  }
+
+  @Test
+  public void testGetSuccess() throws Exception {
+    testHttpMethodOnSuccess(HttpMethod.GET);
+  }
+
+  @Test
+  public void testGetFailure() throws Exception {
+    testHttpMethodOnFailure(HttpMethod.GET);
+  }
+
+  @Test
+  public void testDeleteSuccess() throws Exception {
+    testHttpMethodOnSuccess(HttpMethod.DELETE);
+  }
+
+  @Test
+  public void testDeleteFailure() throws Exception {
+    testHttpMethodOnFailure(HttpMethod.DELETE);
+  }
+
+  @Test
+  public void testHeadSuccess() throws JsonProcessingException {
+    testHttpMethodOnSuccess(HttpMethod.HEAD);
+  }
+
+  @Test
+  public void testHeadFailure() throws JsonProcessingException {
+    testHttpMethodOnFailure(HttpMethod.HEAD);
+  }
+
+  public static void testHttpMethodOnSuccess(HttpMethod method) throws 
JsonProcessingException {
+    Item body = new Item(0L, "hank");
+    int statusCode = 200;
+
+    ErrorHandler onError = mock(ErrorHandler.class);
+    doThrow(new RuntimeException("Failure 
response")).when(onError).accept(any());
+
+    String path = addRequestTestCaseAndGetPath(method, body, statusCode);
+
+    Item successResponse =
+        doExecuteRequest(method, path, body, onError, h -> 
assertThat(h).isNotEmpty());
+
+    if (method.usesRequestBody()) {
+      Assertions.assertThat(body)
+          .as("On a successful " + method + ", the correct response body 
should be returned")
+          .isEqualTo(successResponse);
+    }
+
+    verify(onError, never()).accept(any());
+  }
+
+  public static void testHttpMethodOnFailure(HttpMethod method) throws 
JsonProcessingException {
+    Item body = new Item(0L, "hank");
+    int statusCode = 404;
+
+    ErrorHandler onError = mock(ErrorHandler.class);
+    doThrow(
+            new RuntimeException(
+                String.format(
+                    "Called error handler for method %s due to status code: 
%d",
+                    method, statusCode)))
+        .when(onError)
+        .accept(any());
+
+    String path = addRequestTestCaseAndGetPath(method, body, statusCode);
+
+    Assertions.assertThatThrownBy(() -> doExecuteRequest(method, path, body, 
onError, h -> {}))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessage(
+            String.format(
+                "Called error handler for method %s due to status code: %d", 
method, statusCode));
+
+    verify(onError).accept(any());
+  }
+
+  // Adds a request that the mock-server can match against, based on the 
method, path, body, and
+  // headers.
+  // Return the path generated for the test case, so that the client can call 
that path to exercise
+  // it.
+  private static String addRequestTestCaseAndGetPath(HttpMethod method, Item 
body, int statusCode)
+      throws JsonProcessingException {
+
+    // Build the path route, which must be unique per test case.
+    boolean isSuccess = statusCode == 200;

Review Comment:
   It can be 202 as well right? I think right now it's not surfacing in the 
tests just because we're not explicitly testing those cases but just to make 
this helper bit more robust.



##########
aws/src/main/java/org/apache/iceberg/aws/lambda/rest/LambdaRESTInvoker.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.lambda.rest;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.function.Consumer;
+import org.apache.hc.core5.http.ContentType;
+import org.apache.hc.core5.http.HttpHeaders;
+import org.apache.hc.core5.http.HttpStatus;
+import org.apache.hc.core5.http.Method;
+import org.apache.iceberg.exceptions.RESTException;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.rest.BaseRESTClient;
+import org.apache.iceberg.rest.RESTUtil;
+import org.apache.iceberg.rest.responses.ErrorResponse;
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.services.lambda.LambdaClient;
+import software.amazon.awssdk.services.lambda.model.InvokeRequest;
+import software.amazon.awssdk.services.lambda.model.InvokeResponse;
+
+/**
+ * A client to fulfill REST catalog request and response using AWS Lambda's 
Invoke API.
+ *
+ * <p>This is useful for situations where an HTTP connection cannot be 
established against a REST
+ * endpoint. For example, when the endpoint is in an isolated private subnet, 
a Lambda can be placed
+ * within the subnet as a proxy for communication. See {@link 
LambdaRESTRequest} and {@link
+ * LambdaRESTResponse} for request-response contract
+ */
+public class LambdaRESTInvoker extends BaseRESTClient {
+
+  private LambdaClient lambda;
+  private String functionArn;
+  private Map<String, String> baseHeaders;
+
+  public LambdaRESTInvoker() {}
+
+  @VisibleForTesting
+  LambdaRESTInvoker(
+      String baseUri,
+      ObjectMapper objectMapper,
+      Map<String, String> baseHeaders,
+      LambdaClient lambda,
+      String functionArn) {
+    super.initialize(baseUri, objectMapper, baseHeaders, ImmutableMap.of());
+    this.baseHeaders = baseHeaders;
+    this.lambda = lambda;
+    this.functionArn = functionArn;
+  }
+
+  @Override
+  protected <T> T execute(
+      Method method,
+      String path,
+      Map<String, String> queryParams,
+      Object requestBody,
+      Class<T> responseType,
+      Map<String, String> inputHeaders,
+      Consumer<ErrorResponse> errorHandler,
+      Consumer<Map<String, String>> responseHeaders) {
+    validateInputPath(path);
+    URI uri = buildUri(path, queryParams);
+
+    Map<String, String> headers;
+    String entity = null;
+    if (requestBody instanceof Map) {
+      headers = requestHeaders(inputHeaders, 
ContentType.APPLICATION_FORM_URLENCODED.getMimeType());
+      entity = RESTUtil.encodeFormData((Map<?, ?>) requestBody);
+    } else if (requestBody != null) {
+      headers = requestHeaders(inputHeaders, 
ContentType.APPLICATION_JSON.getMimeType());
+      entity = super.toJsonString(requestBody);
+    } else {
+      headers = requestHeaders(inputHeaders, 
ContentType.APPLICATION_JSON.getMimeType());
+    }
+
+    LambdaRESTRequest request =
+        ImmutableLambdaRESTRequest.builder()
+            .method(method.name())
+            .uri(uri)
+            .headers(headers)
+            .entity(entity)
+            .build();
+
+    InvokeResponse lambdaResponse;
+    try {
+      lambdaResponse =
+          lambda.invoke(
+              InvokeRequest.builder()
+                  .functionName(functionArn)
+                  
.payload(SdkBytes.fromInputStream(LambdaRESTRequestParser.toJsonStream(request)))
+                  .build());
+    } catch (AwsServiceException e) {
+      throw new RESTException(e, "Error occurred while processing %s request", 
method);
+    }
+
+    LambdaRESTResponse response =
+        
LambdaRESTResponseParser.fromJsonStream(lambdaResponse.payload().asInputStream());
+    responseHeaders.accept(response.headers());
+
+    if (response.code() == HttpStatus.SC_NO_CONTENT
+        || (responseType == null && isSuccessful(response.code()))) {
+      return null;
+    }
+
+    if (!isSuccessful(response.code())) {
+      throwFailure(response.code(), response.entity(), response.reason(), 
errorHandler);
+    }
+
+    if (response.entity() == null) {
+      throw new RESTException(
+          "Invalid (null) response body for request (expected %s): method=%s, 
path=%s, status=%d",
+          responseType.getSimpleName(), method.name(), path, response.code());
+    }
+
+    return parseResponse(response.entity(), responseType, response.code());
+  }
+
+  @Override
+  public void close() throws IOException {
+    lambda.close();
+  }
+
+  @Override
+  public void initialize(
+      String baseUri,
+      ObjectMapper objectMapper,
+      Map<String, String> headers,
+      Map<String, String> properties) {
+    super.initialize(baseUri, objectMapper, headers, properties);
+    LambdaRESTInvokerProperties lambdaRESTInvokerProperties =
+        new LambdaRESTInvokerProperties(properties);
+    this.functionArn = lambdaRESTInvokerProperties.functionArn();
+    this.lambda = 
LambdaRESTInvokerAwsClientFactories.from(properties).lambda();
+    this.baseHeaders = headers;
+    // TODO: support sigv4 signer

Review Comment:
   Can we create an issue for better tracking?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to