This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch uri_encoding in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 403f5b75bd45cd4f54a0cdcd04b639d1127a740d Author: Jackie (Xiaotian) Jiang <[email protected]> AuthorDate: Thu Jul 11 15:19:33 2019 -0700 Add URIUtils class to handle URI/URL encoding/decoding Fix the double encoding bug of encoding URI without scheme "/segments/segment %" -> Without fix: "file:/segments/segment+%2525" With fix: "file:/segments/segment+%25" --- .../common/utils/FileUploadDownloadClient.java | 18 ++--- .../org/apache/pinot/common/utils/URIUtils.java | 90 +++++++++++++++++++++ .../org/apache/pinot/filesystem/LocalPinotFS.java | 91 ++++++++-------------- .../apache/pinot/common/utils/URIUtilsTest.java | 87 +++++++++++++++++++++ .../apache/pinot/controller/ControllerConf.java | 46 +---------- .../api/resources/FileUploadPathProvider.java | 5 +- .../resources/LLCSegmentCompletionHandlers.java | 22 +++--- .../api/resources/PinotSegmentRestletResource.java | 20 ++--- .../PinotSegmentUploadRestletResource.java | 60 +++++--------- .../helix/ControllerRequestURLBuilder.java | 19 ++--- .../helix/core/SegmentDeletionManager.java | 24 ++---- .../realtime/PinotLLCRealtimeSegmentManager.java | 34 ++++---- .../helix/core/realtime/SegmentCompletionTest.java | 7 +- .../tests/PinotURIUploadIntegrationTest.java | 8 +- .../tools/query/comparison/ClusterStarter.java | 4 +- 15 files changed, 296 insertions(+), 239 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java index d3d02cd..da984fb 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java @@ -26,16 +26,13 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URISyntaxException; -import java.net.URLEncoder; import java.util.List; import java.util.Map; import javax.annotation.Nullable; import javax.net.ssl.SSLContext; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.StringUtils; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; @@ -101,7 +98,7 @@ public class FileUploadDownloadClient implements Closeable { private static final String SEGMENT_PATH = "/v2/segments"; private static final String SEGMENT_METADATA_PATH = "/segmentmetadata"; private static final String TABLES_PATH = "/tables"; - private static final String TYPE_DELIMITER = "?type="; + private static final String TYPE_QUERY_PREFIX = "type="; private final CloseableHttpClient _httpClient; @@ -133,14 +130,15 @@ public class FileUploadDownloadClient implements Closeable { public static URI getDeleteSegmentHttpUri(String host, int port, String rawTableName, String segmentName, String tableType) - throws URISyntaxException, UnsupportedEncodingException { - return new URI(StringUtil.join("/", StringUtils.chomp(HTTP + "://" + host + ":" + port, "/"), - OLD_SEGMENT_PATH, rawTableName + "/" + URLEncoder.encode(segmentName, "UTF-8") + TYPE_DELIMITER + tableType)); + throws URISyntaxException { + return new URI(HTTP, null, host, port, OLD_SEGMENT_PATH + rawTableName + "/" + URIUtils.encode(segmentName), + TYPE_QUERY_PREFIX + tableType, null); } - public static URI getRetrieveAllSegmentWithTableTypeHttpUri(String host, int port, String rawTableName, String tableType) throws URISyntaxException { - return new URI(StringUtil.join("/", StringUtils.chomp(HTTP + "://" + host + ":" + port, "/"), - OLD_SEGMENT_PATH, rawTableName + TYPE_DELIMITER + tableType)); + public static URI getRetrieveAllSegmentWithTableTypeHttpUri(String host, int port, String rawTableName, + String tableType) + throws URISyntaxException { + return new URI(HTTP, null, host, port, OLD_SEGMENT_PATH + rawTableName, TYPE_QUERY_PREFIX + tableType, null); } public static URI getRetrieveSchemaHttpURI(String host, int port, String schemaName) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java new file mode 100644 index 0000000..7521a6b --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils; + +import java.io.File; +import java.net.URI; +import java.net.URLDecoder; +import java.net.URLEncoder; + + +public class URIUtils { + private URIUtils() { + } + + /** + * Returns the URI for the given base path and optional segments, appends the local (file) scheme to the URI if no + * scheme exists. All the segments will be encoded and appended to the base path. + */ + public static URI getUri(String basePath, String... segments) { + int length = segments.length; + String[] encodedSegments = new String[length + 1]; + encodedSegments[0] = basePath; + for (int i = 0; i < length; i++) { + encodedSegments[i + 1] = encode(segments[i]); + } + String path = String.join(File.separator, encodedSegments); + try { + URI uri = new URI(path); + if (uri.getScheme() != null) { + return uri; + } else { + return new URI(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME + ":" + path); + } + } catch (Exception e) { + throw new IllegalArgumentException("Illegal URI path: " + path, e); + } + } + + /** + * Returns the URL for the given base URL and optional segments. All the segments will be encoded and appended to the + * base URL. + */ + public static String getUrl(String baseUrl, String... segments) { + int length = segments.length; + String[] encodedSegments = new String[length + 1]; + encodedSegments[0] = baseUrl; + for (int i = 0; i < length; i++) { + encodedSegments[i + 1] = encode(segments[i]); + } + return String.join(File.separator, encodedSegments); + } + + public static String constructDownloadUrl(String baseUrl, String rawTableName, String segmentName) { + return getUrl(baseUrl, "segments", rawTableName, segmentName); + } + + public static String encode(String string) { + try { + return URLEncoder.encode(string, "UTF-8"); + } catch (Exception e) { + // Should never happen + throw new RuntimeException(e); + } + } + + public static String decode(String string) { + try { + return URLDecoder.decode(string, "UTF-8"); + } catch (Exception e) { + // Should never happen + throw new RuntimeException(e); + } + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java b/pinot-common/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java index 7161031..4a6e263 100644 --- a/pinot-common/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java +++ b/pinot-common/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java @@ -20,18 +20,14 @@ package org.apache.pinot.filesystem; import java.io.File; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.net.URI; -import java.net.URLDecoder; -import java.net.URLEncoder; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; import org.apache.commons.configuration.Configuration; import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.pinot.common.utils.URIUtils; /** @@ -39,11 +35,6 @@ import org.slf4j.LoggerFactory; * if access to the file is denied. */ public class LocalPinotFS extends PinotFS { - private static final Logger LOGGER = LoggerFactory.getLogger(LocalPinotFS.class); - private static final String DEFAULT_ENCODING = "UTF-8"; - - public LocalPinotFS() { - } @Override public void init(Configuration configuration) { @@ -52,14 +43,14 @@ public class LocalPinotFS extends PinotFS { @Override public boolean mkdir(URI uri) throws IOException { - FileUtils.forceMkdir(new File(decodeURI(uri.getRawPath()))); + FileUtils.forceMkdir(toFile(uri)); return true; } @Override public boolean delete(URI segmentUri, boolean forceDelete) throws IOException { - File file = new File(decodeURI(segmentUri.getRawPath())); + File file = toFile(segmentUri); if (file.isDirectory()) { // Returns false if directory isn't empty if (listFiles(segmentUri, false).length > 0 && !forceDelete) { @@ -77,8 +68,8 @@ public class LocalPinotFS extends PinotFS { @Override protected boolean doMove(URI srcUri, URI dstUri) throws IOException { - File srcFile = new File(decodeURI(srcUri.getRawPath())); - File dstFile = new File(decodeURI(dstUri.getRawPath())); + File srcFile = toFile(srcUri); + File dstFile = toFile(dstUri); if (srcFile.isDirectory()) { FileUtils.moveDirectory(srcFile, dstFile); } else { @@ -90,32 +81,18 @@ public class LocalPinotFS extends PinotFS { @Override public boolean copy(URI srcUri, URI dstUri) throws IOException { - File srcFile = new File(decodeURI(srcUri.getRawPath())); - File dstFile = new File(decodeURI(dstUri.getRawPath())); - if (dstFile.exists()) { - FileUtils.deleteQuietly(dstFile); - } - if (srcFile.isDirectory()) { - // Throws Exception on failure - FileUtils.copyDirectory(srcFile, dstFile); - } else { - // Will create parent directories, throws Exception on failure - FileUtils.copyFile(srcFile, dstFile); - } + copy(toFile(srcUri), toFile(dstUri)); return true; } @Override - public boolean exists(URI fileUri) - throws IOException { - File file = new File(decodeURI(fileUri.getRawPath())); - return file.exists(); + public boolean exists(URI fileUri) { + return toFile(fileUri).exists(); } @Override - public long length(URI fileUri) - throws IOException { - File file = new File(decodeURI(fileUri.getRawPath())); + public long length(URI fileUri) { + File file = toFile(fileUri); if (file.isDirectory()) { throw new IllegalArgumentException("File is directory"); } @@ -125,7 +102,7 @@ public class LocalPinotFS extends PinotFS { @Override public String[] listFiles(URI fileUri, boolean recursive) throws IOException { - File file = new File(decodeURI(fileUri.getRawPath())); + File file = toFile(fileUri); if (!recursive) { return Arrays.stream(file.list()).map(s -> new File(file, s)).map(File::getAbsolutePath).toArray(String[]::new); } else { @@ -137,56 +114,52 @@ public class LocalPinotFS extends PinotFS { @Override public void copyToLocalFile(URI srcUri, File dstFile) throws Exception { - copy(srcUri, new URI(encodeURI(dstFile.getAbsolutePath()))); + copy(toFile(srcUri), dstFile); } @Override public void copyFromLocalFile(File srcFile, URI dstUri) throws Exception { - copy(new URI(encodeURI(srcFile.getAbsolutePath())), dstUri); + copy(srcFile, toFile(dstUri)); } @Override public boolean isDirectory(URI uri) { - File file = new File(decodeURI(uri.getRawPath())); - return file.isDirectory(); + return toFile(uri).isDirectory(); } @Override public long lastModified(URI uri) { - File file = new File(decodeURI(uri.getRawPath())); - return file.lastModified(); + return toFile(uri).lastModified(); } @Override public boolean touch(URI uri) throws IOException { - File file = new File(decodeURI(uri.getRawPath())); - if (!exists(uri)) { + File file = toFile(uri); + if (!file.exists()) { return file.createNewFile(); } return file.setLastModified(System.currentTimeMillis()); } - private String encodeURI(String uri) { - String encodedStr; - try { - encodedStr = URLEncoder.encode(uri, DEFAULT_ENCODING); - } catch (UnsupportedEncodingException e) { - LOGGER.warn("Could not encode uri {}", uri); - throw new RuntimeException(e); - } - return encodedStr; + private static File toFile(URI uri) { + // NOTE: Do not use new File(uri) because scheme might not exist and it does not decode '+' to ' ' + // Do not use uri.getPath() because it does not decode '+' to ' ' + return new File(URIUtils.decode(uri.getRawPath())); } - private String decodeURI(String uri) { - String decodedStr; - try { - decodedStr = URLDecoder.decode(uri, DEFAULT_ENCODING); - } catch (UnsupportedEncodingException e) { - LOGGER.warn("Could not decode uri {}", uri); - throw new RuntimeException(e); + private static void copy(File srcFile, File dstFile) + throws IOException { + if (dstFile.exists()) { + FileUtils.deleteQuietly(dstFile); + } + if (srcFile.isDirectory()) { + // Throws Exception on failure + FileUtils.copyDirectory(srcFile, dstFile); + } else { + // Will create parent directories, throws Exception on failure + FileUtils.copyFile(srcFile, dstFile); } - return decodedStr; } } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java new file mode 100644 index 0000000..797b3e2 --- /dev/null +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.utils; + +import java.io.File; +import java.net.URI; +import java.util.Random; +import org.apache.commons.lang3.RandomStringUtils; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class URIUtilsTest { + private static final int NUM_ROUNDS = 1000; + private static final int MAX_NUM_SEGMENTS = 10; + private static final int MAX_SEGMENT_LENGTH = 10; + private static final Random RANDOM = new Random(); + + @Test + public void testGetUri() { + for (int i = 0; i < NUM_ROUNDS; i++) { + int numSegments = RANDOM.nextInt(MAX_NUM_SEGMENTS + 1); + String[] segments = new String[numSegments]; + for (int j = 0; j < numSegments; j++) { + segments[j] = getRandomString(); + } + URI httpUri = URIUtils.getUri("http://foo/bar", segments); + assertEquals(httpUri.getScheme(), "http"); + assertEquals(httpUri.getAuthority(), "foo"); + URI fileUri = URIUtils.getUri("/foo/bar", segments); + assertEquals(fileUri.getScheme(), "file"); + if (segments.length == 0) { + assertEquals(httpUri.getRawPath(), "/bar"); + assertEquals(fileUri.getRawPath(), "/foo/bar"); + } else { + assertEquals(URIUtils.decode(httpUri.getRawPath()), "/bar/" + String.join(File.separator, segments)); + assertEquals(URIUtils.decode(fileUri.getRawPath()), "/foo/bar/" + String.join(File.separator, segments)); + } + } + } + + @Test + public void testGetUrl() { + for (int i = 0; i < NUM_ROUNDS; i++) { + int numSegments = RANDOM.nextInt(MAX_NUM_SEGMENTS + 1); + String[] segments = new String[numSegments]; + for (int j = 0; j < numSegments; j++) { + segments[j] = getRandomString(); + } + String httpUri = URIUtils.getUrl("http://foo/bar", segments); + if (segments.length == 0) { + assertEquals(httpUri, "http://foo/bar"); + } else { + assertEquals(URIUtils.decode(httpUri), "http://foo/bar/" + String.join(File.separator, segments)); + } + } + } + + @Test + public void testEncodeDecode() { + for (int i = 0; i < NUM_ROUNDS; i++) { + String randomString = getRandomString(); + assertEquals(URIUtils.decode(URIUtils.encode(randomString)), randomString); + } + } + + private static String getRandomString() { + return RandomStringUtils.random(RANDOM.nextInt(MAX_SEGMENT_LENGTH + 1)); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java index 697fcf3..4e872bb 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java @@ -19,10 +19,6 @@ package org.apache.pinot.controller; import java.io.File; -import java.io.UnsupportedEncodingException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URLEncoder; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -30,7 +26,6 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; -import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.filesystem.LocalPinotFS; import org.slf4j.Logger; @@ -59,9 +54,7 @@ public class ControllerConf extends PropertiesConfiguration { private static final String CONTROLLER_MODE = "controller.mode"; public enum ControllerMode { - DUAL, - PINOT_ONLY, - HELIX_ONLY + DUAL, PINOT_ONLY, HELIX_ONLY } public static class ControllerPeriodicTasksConf { @@ -171,43 +164,6 @@ public class ControllerConf extends PropertiesConfiguration { super(); } - /** - * Returns the URI for the given path, appends the local (file) scheme to the URI if no scheme exists. - */ - public static URI getUriFromPath(String path) { - try { - URI uri = new URI(path); - if (uri.getScheme() != null) { - return uri; - } else { - return new URI(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME, path, null); - } - } catch (URISyntaxException e) { - LOGGER.error("Could not construct uri from path {}", path); - throw new RuntimeException(e); - } - } - - public static URI constructSegmentLocation(String baseDataDir, String tableName, String segmentName) { - try { - return getUriFromPath(StringUtil.join(File.separator, baseDataDir, tableName, URLEncoder.encode(segmentName, "UTF-8"))); - } catch (UnsupportedEncodingException e) { - LOGGER - .error("Could not construct segment location with baseDataDir {}, tableName {}, segmentName {}", baseDataDir, - tableName, segmentName); - throw new RuntimeException(e); - } - } - - public static String constructDownloadUrl(String tableName, String segmentName, String vip) { - try { - return StringUtil.join("/", vip, "segments", tableName, URLEncoder.encode(segmentName, "UTF-8")); - } catch (UnsupportedEncodingException e) { - // Shouldn't happen - throw new AssertionError("Encountered error while encoding in UTF-8 format", e); - } - } - public void setLocalTempDir(String localTempDir) { setProperty(LOCAL_TEMP_DIR, localTempDir); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/FileUploadPathProvider.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/FileUploadPathProvider.java index 055b19c..839c3da 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/FileUploadPathProvider.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/FileUploadPathProvider.java @@ -24,6 +24,7 @@ import java.net.URI; import org.apache.commons.lang.StringUtils; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.filesystem.LocalPinotFS; import org.apache.pinot.filesystem.PinotFS; import org.apache.pinot.filesystem.PinotFSFactory; @@ -55,7 +56,7 @@ public class FileUploadPathProvider { StringUtils.stripEnd(dataDir, "/"); try { // URIs that are allowed to be remote - _baseDataDirURI = ControllerConf.getUriFromPath(dataDir); + _baseDataDirURI = URIUtils.getUri(dataDir); LOGGER.info("Data directory: {}", _baseDataDirURI); _schemasTmpDirURI = new URI(_baseDataDirURI + SCHEMAS_TEMP); LOGGER.info("Schema temporary directory: {}", _schemasTmpDirURI); @@ -70,7 +71,7 @@ public class FileUploadPathProvider { LOGGER.info("Local temporary directory is not configured, use data directory as the local temporary directory"); _localTempDirURI = _baseDataDirURI; } else { - _localTempDirURI = ControllerConf.getUriFromPath(localTempDir); + _localTempDirURI = URIUtils.getUri(localTempDir); } LOGGER.info("Local temporary directory: {}", _localTempDirURI); if (!_localTempDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java index 84df735..facb42b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java @@ -19,6 +19,7 @@ package org.apache.pinot.controller.api.resources; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -40,8 +41,6 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; - -import com.google.common.base.Preconditions; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; @@ -52,6 +51,7 @@ import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager; import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; import org.apache.pinot.controller.util.SegmentCompletionUtils; +import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.core.segment.creator.impl.V1Constants; import org.apache.pinot.core.segment.index.SegmentMetadataImpl; import org.apache.pinot.filesystem.PinotFS; @@ -155,8 +155,7 @@ public class LLCSegmentCompletionHandlers { requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset).withReason(stopReason); LOGGER.info("Processing segmentStoppedConsuming:{}", requestParams.toString()); - SegmentCompletionProtocol.Response response = - _segmentCompletionManager.segmentStoppedConsuming(requestParams); + SegmentCompletionProtocol.Response response = _segmentCompletionManager.segmentStoppedConsuming(requestParams); final String responseStr = response.toJsonString(); LOGGER.info("Response to segmentStoppedConsuming for segment:{} is:{}", segmentName, responseStr); return responseStr; @@ -185,8 +184,7 @@ public class LLCSegmentCompletionHandlers { LOGGER.info("Processing segmentCommitStart:{}", requestParams.toString()); - SegmentCompletionProtocol.Response response = - _segmentCompletionManager.segmentCommitStart(requestParams); + SegmentCompletionProtocol.Response response = _segmentCompletionManager.segmentCommitStart(requestParams); final String responseStr = response.toJsonString(); LOGGER.info("Response to segmentCommitStart for segment:{} is:{}", segmentName, responseStr); return responseStr; @@ -283,8 +281,7 @@ public class LLCSegmentCompletionHandlers { try { FileUploadPathProvider provider = new FileUploadPathProvider(_controllerConf); final String rawTableName = new LLCSegmentName(segmentName).getTableName(); - URI segmentFileURI = ControllerConf.getUriFromPath( - StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName, segmentName)); + URI segmentFileURI = URIUtils.getUri(provider.getBaseDataDirURI().toString(), rawTableName, segmentName); PinotFS pinotFS = PinotFSFactory.create(provider.getBaseDataDirURI().getScheme()); // Multiple threads can reach this point at the same time, if the following scenario happens // The server that was asked to commit did so very slowly (due to network speeds). Meanwhile the FSM in @@ -428,8 +425,8 @@ public class LLCSegmentCompletionHandlers { try { Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create directory: %s", tempMetadataDirStr); // Extract metadata.properties from the metadataFiles. - if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr, - V1Constants.MetadataKeys.METADATA_FILE_NAME, segmentNameStr)) { + if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr, V1Constants.MetadataKeys.METADATA_FILE_NAME, + segmentNameStr)) { return null; } // Extract creation.meta from the metadataFiles. @@ -486,7 +483,7 @@ public class LLCSegmentCompletionHandlers { File tempSegmentDataDir = new File(tempSegmentDataDirStr); File segDstFile = new File(StringUtil.join("/", tempSegmentDataDirStr, segmentNameStr)); // Use PinotFS to copy the segment file to local fs for metadata extraction. - PinotFS pinotFS = PinotFSFactory.create(ControllerConf.getUriFromPath(_controllerConf.getDataDir()).getScheme()); + PinotFS pinotFS = PinotFSFactory.create(URIUtils.getUri(_controllerConf.getDataDir()).getScheme()); try { Preconditions.checkState(tempSegmentDataDir.mkdirs(), "Failed to create directory: %s", tempSegmentDataDir); pinotFS.copyToLocalFile(segmentLocation, segDstFile); @@ -575,8 +572,7 @@ public class LLCSegmentCompletionHandlers { // See PinotLLCRealtimeSegmentManager.commitSegmentFile(). // TODO: move tmp file logic into SegmentCompletionUtils. String uniqueSegmentFileName = SegmentCompletionUtils.generateSegmentFileName(segmentName); - URI segmentFileURI = ControllerConf.getUriFromPath( - StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName, uniqueSegmentFileName)); + URI segmentFileURI = URIUtils.getUri(provider.getBaseDataDirURI().toString(), rawTableName, uniqueSegmentFileName); PinotFS pinotFS = PinotFSFactory.create(provider.getBaseDataDirURI().getScheme()); pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI); return segmentFileURI; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index 15cd6d5..6e6ecb0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -24,8 +24,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -54,6 +52,7 @@ import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.JsonUtils; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; +import org.apache.pinot.common.utils.URIUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -138,7 +137,7 @@ public class PinotSegmentRestletResource { @ApiParam(value = "Name of segment", required = true) @PathParam("segmentName") @Encoded String segmentName, @ApiParam(value = "enable|disable|drop", required = false) @QueryParam("state") String stateStr, @ApiParam(value = "realtime|offline", required = false) @QueryParam("type") String tableTypeStr) { - segmentName = checkGetEncodedParam(segmentName); + segmentName = URIUtils.decode(segmentName); // segmentName will never be null,otherwise we would reach the method toggleStateOrListMetadataForAllSegments() CommonConstants.Helix.TableType tableType = Constants.validateTableType(tableTypeStr); StateType stateType = Constants.validateState(stateStr); @@ -162,15 +161,6 @@ public class PinotSegmentRestletResource { } } - private String checkGetEncodedParam(String encoded) { - try { - return URLDecoder.decode(encoded, "UTF-8"); - } catch (UnsupportedEncodingException e) { - String errStr = "Could not decode parameter '" + encoded + "'"; - throw new ControllerApplicationException(LOGGER, errStr, Response.Status.BAD_REQUEST); - } - } - @GET @Path("tables/{tableName}/segments/metadata") @Produces(MediaType.APPLICATION_JSON) @@ -193,7 +183,7 @@ public class PinotSegmentRestletResource { @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, @ApiParam(value = "Name of segment", required = true) @PathParam("segmentName") @Encoded String segmentName, @ApiParam(value = "realtime|offline", required = false) @QueryParam("type") String tableTypeStr) { - segmentName = checkGetEncodedParam(segmentName); + segmentName = URIUtils.decode(segmentName); CommonConstants.Helix.TableType tableType = Constants.validateTableType(tableTypeStr); return listSegmentMetadataInternal(tableName, segmentName, tableType); } @@ -229,7 +219,7 @@ public class PinotSegmentRestletResource { @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, @ApiParam(value = "Name of segment", required = true) @PathParam("segmentName") @Encoded String segmentName, @ApiParam(value = "realtime|offline") @QueryParam("type") String tableTypeStr) { - segmentName = checkGetEncodedParam(segmentName); + segmentName = URIUtils.decode(segmentName); CommonConstants.Helix.TableType tableType = Constants.validateTableType(tableTypeStr); return reloadSegmentForTable(tableName, segmentName, tableType); } @@ -254,7 +244,7 @@ public class PinotSegmentRestletResource { @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, @ApiParam(value = "Name of segment", required = true) @PathParam("segmentName") @Encoded String segmentName, @ApiParam(value = "realtime|offline") @QueryParam("type") String tableTypeStr) { - segmentName = checkGetEncodedParam(segmentName); + segmentName = URIUtils.decode(segmentName); CommonConstants.Helix.TableType tableType = Constants.validateTableType(tableTypeStr); return reloadSegmentForTable(tableName, segmentName, tableType); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java index 5259d9f..182c67a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java @@ -28,11 +28,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.UnsupportedEncodingException; import java.net.InetAddress; import java.net.URI; -import java.net.URLDecoder; -import java.net.URLEncoder; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -57,7 +54,6 @@ import javax.ws.rs.core.Response; import org.apache.commons.httpclient.HttpConnectionManager; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; -import org.apache.helix.ZNRecord; import org.apache.helix.model.IdealState; import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.metrics.ControllerMeter; @@ -67,7 +63,7 @@ import org.apache.pinot.common.segment.fetcher.SegmentFetcherFactory; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.JsonUtils; -import org.apache.pinot.common.utils.StringUtil; +import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.ControllerLeadershipManager; @@ -185,13 +181,8 @@ public class PinotSegmentUploadRestletResource { } catch (Exception e) { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); } - try { - segmentName = URLDecoder.decode(segmentName, "UTF-8"); - } catch (UnsupportedEncodingException e) { - String errStr = "Could not decode segment name '" + segmentName + "'"; - throw new ControllerApplicationException(LOGGER, errStr, Response.Status.BAD_REQUEST); - } - final File dataFile = new File(provider.getBaseDataDir(), StringUtil.join("/", tableName, segmentName)); + segmentName = URIUtils.decode(segmentName); + File dataFile = new File(new File(provider.getBaseDataDir(), tableName), segmentName); if (!dataFile.exists()) { throw new ControllerApplicationException(LOGGER, "Segment " + segmentName + " or table " + tableName + " not found", Response.Status.NOT_FOUND); @@ -214,12 +205,7 @@ public class PinotSegmentUploadRestletResource { if (tableType == null) { throw new ControllerApplicationException(LOGGER, "Table type must not be null", Response.Status.BAD_REQUEST); } - try { - segmentName = URLDecoder.decode(segmentName, "UTF-8"); - } catch (UnsupportedEncodingException e) { - String errStr = "Could not decode segment name '" + segmentName + "'"; - throw new ControllerApplicationException(LOGGER, errStr, Response.Status.BAD_REQUEST); - } + segmentName = URIUtils.decode(segmentName); PinotSegmentRestletResource .toggleStateInternal(tableName, StateType.DROP, tableType, segmentName, _pinotHelixResourceManager); @@ -267,8 +253,6 @@ public class PinotSegmentUploadRestletResource { File tempEncryptedFile = null; File tempDecryptedFile = null; File tempSegmentDir = null; - SegmentMetadata segmentMetadata; - String zkDownloadUri = null; try { FileUploadPathProvider provider = new FileUploadPathProvider(_controllerConf); String tempFileName = TMP_DIR_PREFIX + System.nanoTime(); @@ -288,6 +272,7 @@ public class PinotSegmentUploadRestletResource { // TODO: Change when metadata upload added String metadataProviderClass = DefaultMetadataExtractor.class.getName(); + SegmentMetadata segmentMetadata; switch (uploadType) { case URI: segmentMetadata = @@ -304,19 +289,20 @@ public class PinotSegmentUploadRestletResource { } String rawTableName = segmentMetadata.getTableName(); + String segmentName = segmentMetadata.getName(); + String zkDownloadUri; // This boolean is here for V1 segment upload, where we keep the segment in the downloadURI sent in the header. // We will deprecate this behavior eventually. if (!moveSegmentToFinalLocation) { LOGGER.info("Setting zkDownloadUri to {} for segment {} of table {}, skipping move", currentSegmentLocationURI, - segmentMetadata.getName(), rawTableName); + segmentName, rawTableName); zkDownloadUri = currentSegmentLocationURI; } else { - zkDownloadUri = getZkDownloadURIForSegmentUpload(rawTableName, segmentMetadata, provider); + zkDownloadUri = getZkDownloadURIForSegmentUpload(provider, rawTableName, segmentName); } String clientAddress = InetAddress.getByName(request.getRemoteAddr()).getHostName(); - String segmentName = segmentMetadata.getName(); String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName); LOGGER .info("Processing upload request for segment: {} of table: {} from client: {}", segmentName, offlineTableName, @@ -332,8 +318,7 @@ public class PinotSegmentUploadRestletResource { completeZkOperations(enableParallelPushProtection, headers, tempEncryptedFile, provider, rawTableName, segmentMetadata, segmentName, zkDownloadUri, moveSegmentToFinalLocation, segmentValidatorResponse); - return new SuccessResponse( - "Successfully uploaded segment: " + segmentMetadata.getName() + " of table: " + rawTableName); + return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + rawTableName); } catch (WebApplicationException e) { throw e; } catch (Exception e) { @@ -347,17 +332,16 @@ public class PinotSegmentUploadRestletResource { } } - private String getZkDownloadURIForSegmentUpload(String rawTableName, SegmentMetadata segmentMetadata, - FileUploadPathProvider provider) - throws UnsupportedEncodingException { - if (provider.getBaseDataDirURI().getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) { - return ControllerConf.constructDownloadUrl(rawTableName, segmentMetadata.getName(), provider.getVip()); + private String getZkDownloadURIForSegmentUpload(FileUploadPathProvider provider, String rawTableName, + String segmentName) { + URI baseDataDirURI = provider.getBaseDataDirURI(); + if (baseDataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) { + return URIUtils.constructDownloadUrl(provider.getVip(), rawTableName, segmentName); } else { // Receiving .tar.gz segment upload for pluggable storage - LOGGER.info("Using configured data dir {} for segment {} of table {}", _controllerConf.getDataDir(), - segmentMetadata.getName(), rawTableName); - return StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName, - URLEncoder.encode(segmentMetadata.getName(), "UTF-8")); + LOGGER.info("Using configured data dir {} for segment {} of table {}", _controllerConf.getDataDir(), segmentName, + rawTableName); + return URIUtils.constructDownloadUrl(baseDataDirURI.toString(), rawTableName, segmentName); } } @@ -388,12 +372,10 @@ public class PinotSegmentUploadRestletResource { } private void completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers, File tempDecryptedFile, - FileUploadPathProvider provider, String rawTableName, SegmentMetadata segmentMetadata, String segmentName, String zkDownloadURI, - boolean moveSegmentToFinalLocation, SegmentValidatorResponse segmentValidatorResponse) + FileUploadPathProvider provider, String rawTableName, SegmentMetadata segmentMetadata, String segmentName, + String zkDownloadURI, boolean moveSegmentToFinalLocation, SegmentValidatorResponse segmentValidatorResponse) throws Exception { - String finalSegmentPath = StringUtil - .join("/", provider.getBaseDataDirURI().toString(), rawTableName, URLEncoder.encode(segmentName, "UTF-8")); - URI finalSegmentLocationURI = new URI(finalSegmentPath); + URI finalSegmentLocationURI = URIUtils.getUri(provider.getBaseDataDirURI().toString(), rawTableName, segmentName); ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics); zkOperator.completeSegmentOperations(rawTableName, segmentMetadata, finalSegmentLocationURI, tempDecryptedFile, enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, segmentValidatorResponse); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java index 584a558..1251eb1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java @@ -18,11 +18,10 @@ */ package org.apache.pinot.controller.helix; -import java.io.IOException; -import java.net.URLEncoder; import org.apache.avro.reflect.Nullable; import org.apache.commons.lang.StringUtils; import org.apache.pinot.common.utils.StringUtil; +import org.apache.pinot.common.utils.URIUtils; /** @@ -206,20 +205,16 @@ public class ControllerRequestURLBuilder { System.out.println(ControllerRequestURLBuilder.baseUrl("localhost:8089").forInstanceCreate()); } - public String forSegmentDownload(String tableName, String segmentName) - throws IOException { - return StringUtil - .join("/", StringUtils.chomp(_baseUrl, "/"), "segments", tableName, URLEncoder.encode(segmentName, "UTF-8")); + public String forSegmentDownload(String tableName, String segmentName) { + return URIUtils.constructDownloadUrl(StringUtils.chomp(_baseUrl, "/"), tableName, segmentName); } public String forSegmentDelete(String resourceName, String segmentName) { return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "datafiles", resourceName, segmentName); } - public String forSegmentDeleteAPI(String tableName, String segmentName, String tableType) - throws Exception { - return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "segments", tableName, - URLEncoder.encode(segmentName, "UTF-8") + "?type=" + tableType); + public String forSegmentDeleteAPI(String tableName, String segmentName, String tableType) { + return URIUtils.getUrl(StringUtils.chomp(_baseUrl, "/"), "segments", tableName, segmentName) + "?type=" + tableType; } public String forSegmentDeleteAllAPI(String tableName, String tableType) @@ -244,8 +239,8 @@ public class ControllerRequestURLBuilder { public String forDeleteSegmentWithGetAPI(String tableName, String segmentName, String tableType) throws Exception { - return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, "segments", - URLEncoder.encode(segmentName, "UTF-8") + "?state=drop&" + "type=" + tableType); + return URIUtils.getUrl(StringUtils.chomp(_baseUrl, "/"), "tables", tableName, "segments", segmentName) + + "?state=drop&type=" + tableType; } public String forDeleteAllSegmentsWithTypeWithGetAPI(String tableName, String tableType) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java index 46c84c0..73fbd44 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.controller.helix.core; -import java.io.File; import java.io.IOException; import java.net.URI; import java.util.ArrayList; @@ -41,8 +40,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.utils.SegmentName; -import org.apache.pinot.common.utils.StringUtil; -import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.filesystem.PinotFS; import org.apache.pinot.filesystem.PinotFSFactory; import org.joda.time.DateTime; @@ -174,14 +172,9 @@ public class SegmentDeletionManager { protected void removeSegmentFromStore(String tableNameWithType, String segmentId) { final String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); if (_dataDir != null) { - URI fileToMoveURI; - PinotFS pinotFS; - URI dataDirURI = ControllerConf.getUriFromPath(_dataDir); - fileToMoveURI = ControllerConf.constructSegmentLocation(_dataDir, rawTableName, segmentId); - URI deletedSegmentDestURI = ControllerConf - .constructSegmentLocation(StringUtil.join(File.separator, _dataDir, DELETED_SEGMENTS), rawTableName, - segmentId); - pinotFS = PinotFSFactory.create(dataDirURI.getScheme()); + URI fileToMoveURI = URIUtils.getUri(_dataDir, rawTableName, segmentId); + URI deletedSegmentDestURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, rawTableName, segmentId); + PinotFS pinotFS = PinotFSFactory.create(fileToMoveURI.getScheme()); try { if (pinotFS.exists(fileToMoveURI)) { @@ -216,9 +209,8 @@ public class SegmentDeletionManager { */ public void removeAgedDeletedSegments(int retentionInDays) { if (_dataDir != null) { - URI dataDirURI = ControllerConf.getUriFromPath(_dataDir); - URI deletedDirURI = ControllerConf.getUriFromPath(StringUtil.join(File.separator, _dataDir, DELETED_SEGMENTS)); - PinotFS pinotFS = PinotFSFactory.create(dataDirURI.getScheme()); + URI deletedDirURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS); + PinotFS pinotFS = PinotFSFactory.create(deletedDirURI.getScheme()); try { // Check that the directory for deleted segments exists. @@ -234,12 +226,12 @@ public class SegmentDeletionManager { } for (String tableNameDir : tableNameDirs) { - URI tableNameURI = ControllerConf.getUriFromPath(tableNameDir); + URI tableNameURI = URIUtils.getUri(tableNameDir); // Get files that are aged final String[] targetFiles = pinotFS.listFiles(tableNameURI, false); int numFilesDeleted = 0; for (String targetFile : targetFiles) { - URI targetURI = ControllerConf.getUriFromPath(targetFile); + URI targetURI = URIUtils.getUri(targetFile); Date dateToDelete = DateTime.now().minusDays(retentionInDays).toDate(); if (pinotFS.lastModified(targetURI) < dateToDelete.getTime()) { if (!pinotFS.delete(targetURI, true)) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index c833261..32e7454 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -62,7 +62,6 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.SegmentName; -import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.common.utils.retry.RetryPolicies; import org.apache.pinot.controller.ControllerConf; @@ -75,6 +74,7 @@ import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegment import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager; import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater; import org.apache.pinot.controller.util.SegmentCompletionUtils; +import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.core.realtime.segment.ConsumingSegmentAssignmentStrategy; import org.apache.pinot.core.realtime.segment.RealtimeSegmentAssignmentStrategy; import org.apache.pinot.core.realtime.stream.OffsetCriteria; @@ -148,7 +148,6 @@ public class PinotLLCRealtimeSegmentManager { _controllerLeadershipManager = controllerLeadershipManager; } - public boolean getIsSplitCommitEnabled() { return _controllerConf.getAcceptSplitCommit(); } @@ -335,11 +334,10 @@ public class PinotLLCRealtimeSegmentManager { } String segmentName = committingSegmentDescriptor.getSegmentName(); String segmentLocation = committingSegmentDescriptor.getSegmentLocation(); - URI segmentFileURI = ControllerConf.getUriFromPath(segmentLocation); - URI baseDirURI = ControllerConf.getUriFromPath(_controllerConf.getDataDir()); - URI tableDirURI = ControllerConf.getUriFromPath(StringUtil.join("/", _controllerConf.getDataDir(), tableName)); - URI uriToMoveTo = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), segmentName)); - PinotFS pinotFS = PinotFSFactory.create(baseDirURI.getScheme()); + URI segmentFileURI = URIUtils.getUri(segmentLocation); + URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), tableName); + URI uriToMoveTo = URIUtils.getUri(_controllerConf.getDataDir(), tableName, segmentName); + PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme()); if (!isConnected() || !isLeader()) { // We can potentially log a different value than what we saw .... @@ -452,7 +450,7 @@ public class PinotLLCRealtimeSegmentManager { // Step-1 boolean success = updateOldSegmentMetadataZNRecord(realtimeTableName, committingLLCSegmentName, nextOffset, - committingSegmentDescriptor); + committingSegmentDescriptor); if (!success) { return false; } @@ -483,7 +481,7 @@ public class PinotLLCRealtimeSegmentManager { } catch (Exception e) { LOGGER.error("Caught exception when updating ideal state for {}", committingSegmentNameStr, e); return false; - } finally { + } finally { lock.unlock(); } @@ -502,8 +500,7 @@ public class PinotLLCRealtimeSegmentManager { * @return */ protected boolean updateOldSegmentMetadataZNRecord(String realtimeTableName, LLCSegmentName committingLLCSegmentName, - long nextOffset, - CommittingSegmentDescriptor committingSegmentDescriptor) { + long nextOffset, CommittingSegmentDescriptor committingSegmentDescriptor) { String committingSegmentNameStr = committingLLCSegmentName.getSegmentName(); Stat stat = new Stat(); @@ -516,8 +513,8 @@ public class PinotLLCRealtimeSegmentManager { return false; } if (committingSegmentDescriptor.getSegmentMetadata() == null) { - LOGGER.error("No segment metadata found in descriptor for committing segment {} for table {}", committingLLCSegmentName, - realtimeTableName); + LOGGER.error("No segment metadata found in descriptor for committing segment {} for table {}", + committingLLCSegmentName, realtimeTableName); return false; } SegmentMetadataImpl segmentMetadata = committingSegmentDescriptor.getSegmentMetadata(); @@ -527,7 +524,7 @@ public class PinotLLCRealtimeSegmentManager { committingSegmentMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE); String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName); committingSegmentMetadata.setDownloadUrl( - ControllerConf.constructDownloadUrl(rawTableName, committingSegmentNameStr, _controllerConf.generateVipUrl())); + URIUtils.constructDownloadUrl(_controllerConf.generateVipUrl(), rawTableName, committingSegmentNameStr)); committingSegmentMetadata.setCrc(Long.valueOf(segmentMetadata.getCrc())); committingSegmentMetadata.setStartTime(segmentMetadata.getTimeInterval().getStartMillis()); committingSegmentMetadata.setEndTime(segmentMetadata.getTimeInterval().getEndMillis()); @@ -1010,8 +1007,9 @@ public class PinotLLCRealtimeSegmentManager { LLCRealtimeSegmentZKMetadata metadata = getRealtimeSegmentZKMetadata(tableNameWithType, segmentId, stat); long metadataUpdateTime = stat.getMtime(); if (now > metadataUpdateTime + MAX_SEGMENT_COMPLETION_TIME_MILLIS) { - LOGGER.info("Segment:{}, Now:{}, metadataUpdateTime:{}, Exceeded MAX_SEGMENT_COMPLETION_TIME_MILLIS:{}", - segmentId, now, metadataUpdateTime, MAX_SEGMENT_COMPLETION_TIME_MILLIS); + LOGGER + .info("Segment:{}, Now:{}, metadataUpdateTime:{}, Exceeded MAX_SEGMENT_COMPLETION_TIME_MILLIS:{}", segmentId, + now, metadataUpdateTime, MAX_SEGMENT_COMPLETION_TIME_MILLIS); return true; } return false; @@ -1340,8 +1338,8 @@ public class PinotLLCRealtimeSegmentManager { Map<String, String> stateMap = idealState.getInstanceStateMap(newSegmentId); if (stateMap == null) { for (String instance : newSegmentInstances) { - idealState - .setPartitionState(newSegmentId, instance, PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE); + idealState.setPartitionState(newSegmentId, instance, + PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java index b05f9bf..486439c 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java @@ -29,6 +29,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.ControllerLeadershipManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; @@ -40,7 +41,6 @@ import org.testng.annotations.Test; import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.ControllerResponseStatus; import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.Request; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -1165,9 +1165,8 @@ public class SegmentCompletionTest { public boolean commitSegmentMetadata(String rawTableName, CommittingSegmentDescriptor committingSegmentDescriptor) { _segmentMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE); _segmentMetadata.setEndOffset(committingSegmentDescriptor.getNextOffset()); - _segmentMetadata.setDownloadUrl(ControllerConf - .constructDownloadUrl(rawTableName, committingSegmentDescriptor.getSegmentName(), - CONTROLLER_CONF.generateVipUrl())); + _segmentMetadata.setDownloadUrl(URIUtils.constructDownloadUrl(CONTROLLER_CONF.generateVipUrl(), rawTableName, + committingSegmentDescriptor.getSegmentName())); _segmentMetadata.setEndTime(_segmentCompletionMgr.getCurrentTimeMs()); return true; } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java index 65cf969..9debc81 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java @@ -23,7 +23,6 @@ import com.google.common.util.concurrent.MoreExecutors; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.net.URLEncoder; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -111,7 +110,8 @@ public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTestSet } } - private File generateRandomSegment(String segmentName, int rowCount) throws Exception { + private File generateRandomSegment(String segmentName, int rowCount) + throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); Schema schema = new Schema.Parser() .parse(new File(TestUtils.getFileFromResourceUrl(getClass().getClassLoader().getResource("dummy.avsc")))); @@ -267,12 +267,12 @@ public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTestSet } } - private List<String> getAllSegments(String tablename) + private List<String> getAllSegments(String tableName) throws IOException { List<String> allSegments = new ArrayList<>(); HttpHost controllerHttpHost = new HttpHost("localhost", _controllerPort); HttpClient controllerClient = new DefaultHttpClient(); - HttpGet req = new HttpGet("/segments/" + URLEncoder.encode(tablename, "UTF-8")); + HttpGet req = new HttpGet("/segments/" + tableName); HttpResponse res = controllerClient.execute(controllerHttpHost, req); try { if (res.getStatusLine().getStatusCode() != 200) { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/ClusterStarter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/ClusterStarter.java index 18dedc4..bc7eb7a 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/ClusterStarter.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/ClusterStarter.java @@ -25,11 +25,11 @@ import java.io.InputStreamReader; import java.net.SocketException; import java.net.URL; import java.net.URLConnection; -import java.net.URLEncoder; import java.net.UnknownHostException; import org.apache.pinot.common.config.TableConfig; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.NetUtil; +import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.controller.helix.ControllerRequestURLBuilder; import org.apache.pinot.tools.admin.command.AddTableCommand; import org.apache.pinot.tools.admin.command.CreateSegmentCommand; @@ -254,7 +254,7 @@ public class ClusterStarter { public int perfQuery(String query) throws Exception { LOGGER.debug("Running perf query on Pinot Cluster"); - String encodedQuery = URLEncoder.encode(query, "UTF-8"); + String encodedQuery = URIUtils.encode(query); String brokerUrl = _perfUrl + encodedQuery; LOGGER.info("Executing command: " + brokerUrl); URLConnection conn = new URL(brokerUrl).openConnection(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
