This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch deletePush
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 64a732dce4cba2db4217368f9a3f5cde52d0e729
Author: Jennifer Dai <[email protected]>
AuthorDate: Mon Jun 24 13:34:20 2019 -0700

    Adding util methods for controller
---
 .../common/utils/FileUploadDownloadClient.java     | 39 ++++++++++++++++++
 .../apache/pinot/hadoop/job/ControllerRestApi.java |  9 +++++
 .../pinot/hadoop/job/DefaultControllerRestApi.java | 47 ++++++++++++++++++++++
 3 files changed, 95 insertions(+)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index de8c76a..4cf3210 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -26,8 +26,10 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.net.URLEncoder;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
@@ -89,12 +91,14 @@ public class FileUploadDownloadClient implements Closeable {
 
   public static final int DEFAULT_SOCKET_TIMEOUT_MS = 600 * 1000; // 10 minutes
   public static final int GET_REQUEST_SOCKET_TIMEOUT_MS = 5 * 1000; // 5 
seconds
+  public static final int DELETE_REQUEST_SOCKET_TIMEOUT_MS = 10 * 1000; // 10 
seconds
 
   private static final String HTTP = "http";
   private static final String HTTPS = "https";
   private static final String SCHEMA_PATH = "/schemas";
   private static final String OLD_SEGMENT_PATH = "/segments";
   private static final String SEGMENT_PATH = "/v2/segments";
+  private static final String DELETE_SEGMENT_PATH = "/segments";
   private static final String SEGMENT_METADATA_PATH = "/segmentmetadata";
   private static final String TABLES_PATH = "/tables";
   private static final String TYPE_DELIMITER = "?type=";
@@ -127,6 +131,16 @@ public class FileUploadDownloadClient implements Closeable 
{
     return getURI(HTTP, host, port, TABLES_PATH + "/" + rawTableName);
   }
 
+  public static URI getDeleteSegmentHttpUri(String host, int port, String 
rawTableName, String segmentName,
+      String tableType)
+      throws URISyntaxException, UnsupportedEncodingException {
+    return getURI(HTTP, host, port, DELETE_SEGMENT_PATH + "/" + rawTableName + 
"/" + URLEncoder.encode(segmentName, "UTF-8") + "?type=" + tableType);
+  }
+
+  public static URI getRetrieveAllSegmentWithTableTypeHttpUri(String host, int 
port, String rawTableName, String tableType) throws URISyntaxException {
+    return getURI(HTTP, host, port, OLD_SEGMENT_PATH + "/" + rawTableName + 
"?type=" + tableType);
+  }
+
   public static URI getRetrieveSchemaHttpURI(String host, int port, String 
schemaName)
       throws URISyntaxException {
     return getURI(HTTP, host, port, SCHEMA_PATH + "/" + schemaName);
@@ -196,12 +210,32 @@ public class FileUploadDownloadClient implements 
Closeable {
     return requestBuilder.build();
   }
 
+  private static HttpUriRequest getDeleteFileRequest(String method, URI uri, 
ContentBody contentBody,
+      @Nullable List<Header> headers, @Nullable List<NameValuePair> 
parameters, int socketTimeoutMs) {
+    // Build the Http entity
+    HttpEntity entity = 
MultipartEntityBuilder.create().setMode(HttpMultipartMode.BROWSER_COMPATIBLE)
+        .addPart(contentBody.getFilename(), contentBody).build();
+
+    // Build the request
+    RequestBuilder requestBuilder =
+        
RequestBuilder.create(method).setVersion(HttpVersion.HTTP_1_1).setUri(uri).setEntity(entity);
+    addHeadersAndParameters(requestBuilder, headers, parameters);
+    setTimeout(requestBuilder, socketTimeoutMs);
+    return requestBuilder.build();
+  }
+
   private static HttpUriRequest constructGetRequest(URI uri) {
     RequestBuilder requestBuilder = 
RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
     setTimeout(requestBuilder, GET_REQUEST_SOCKET_TIMEOUT_MS);
     return requestBuilder.build();
   }
 
+  private static HttpUriRequest constructDeleteRequest(URI uri) {
+    RequestBuilder requestBuilder = 
RequestBuilder.delete(uri).setVersion(HttpVersion.HTTP_1_1);
+    setTimeout(requestBuilder, DELETE_REQUEST_SOCKET_TIMEOUT_MS);
+    return requestBuilder.build();
+  }
+
   private static HttpUriRequest getAddSchemaRequest(URI uri, String 
schemaName, File schemaFile) {
     return getUploadFileRequest(HttpPost.METHOD_NAME, uri, 
getContentBody(schemaName, schemaFile), null, null,
         DEFAULT_SOCKET_TIMEOUT_MS);
@@ -355,6 +389,11 @@ public class FileUploadDownloadClient implements Closeable 
{
     return sendRequest(constructGetRequest(uri));
   }
 
+  public SimpleHttpResponse sendDeleteRequest(URI uri)
+      throws IOException, HttpErrorStatusException {
+    return sendRequest(constructDeleteRequest(uri));
+  }
+
   /**
    * Add schema.
    *
diff --git 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
index 319b55b..79f00d5 100644
--- 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
+++ 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java
@@ -35,4 +35,13 @@ public interface ControllerRestApi extends Closeable {
   void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths);
 
   void sendSegmentUris(List<String> segmentUris);
+
+  /**
+   * Delete extra segments after push during REFRESH use cases. Also used in 
APPEND use cases where
+   * a day that has been re-pushed has extra segments.
+   * @param segmentUris
+   */
+  void deleteExtraSegmentUris(List<String> segmentUris);
+
+  List<String> getAllSegments(String tableType);
 }
diff --git 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
index 089ea6e..aec06eb 100644
--- 
a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
+++ 
b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java
@@ -135,8 +135,55 @@ public class DefaultControllerRestApi implements 
ControllerRestApi {
   }
 
   @Override
+  public void deleteExtraSegmentUris(List<String> segmentUris) {
+    LOGGER.info("Start deleting segment URIs: {} to locations: {}", 
segmentUris, _pushLocations);
+    for (String segmentUri : segmentUris) {
+      for (PushLocation pushLocation : _pushLocations) {
+        LOGGER.info("Sending deleting segment URI: {} to location: {}", 
segmentUri, pushLocation);
+        try {
+          SimpleHttpResponse response = 
_fileUploadDownloadClient.sendDeleteRequest(
+              
FileUploadDownloadClient.getDeleteSegmentHttpUri(pushLocation.getHost(), 
pushLocation.getPort(), _rawTableName,
+              segmentUri, "OFFLINE"));
+          LOGGER.info("Response {}: {}", response.getStatusCode(), 
response.getResponse());
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while deleting segment URI: {} to 
location: {}", segmentUri, pushLocation, e);
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public List<String> getAllSegments(String tableType) {
+    LOGGER.info("Getting all segments");
+    for (PushLocation pushLocation : _pushLocations) {
+      try {
+        SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest(
+            
FileUploadDownloadClient.getRetrieveAllSegmentWithTableTypeHttpUri(pushLocation.getHost(),
 pushLocation.getPort(),
+                _rawTableName, tableType));
+        JsonNode segmentList = 
getSegmentsFromJsonSegmentAPI(response.getResponse(), tableType);
+        return segmentList.findValuesAsText(tableType);
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while getting all {} segments for table: 
{} from push location: {}", tableType, _rawTableName,
+            pushLocation, e);
+      }
+    }
+    String errorMessage =
+        String.format("Failed to get a list of all segments from push 
locations: %s for table: %s", _pushLocations,
+            _rawTableName);
+    LOGGER.error(errorMessage);
+    throw new RuntimeException(errorMessage);
+
+  }
+
+  @Override
   public void close()
       throws IOException {
     _fileUploadDownloadClient.close();
   }
+
+  private JsonNode getSegmentsFromJsonSegmentAPI(String json, String type)
+      throws Exception {
+    return JsonUtils.stringToJsonNode(json).get(0).get(type);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to