This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3755e83 Add touch method in PinotFS; Call touch when moving deleted
segments. (#3684)
3755e83 is described below
commit 3755e839a0c73e368f9e07a26f61f7e687864998
Author: Jialiang Li <[email protected]>
AuthorDate: Tue Jan 15 17:47:04 2019 -0800
Add touch method in PinotFS; Call touch when moving deleted segments.
(#3684)
---
.../org/apache/pinot/filesystem/AzurePinotFS.java | 11 +++++++++
.../helix/core/SegmentDeletionManager.java | 5 +++-
.../org/apache/pinot/filesystem/LocalPinotFS.java | 9 +++++++
.../java/org/apache/pinot/filesystem/PinotFS.java | 10 +++++++-
.../apache/pinot/filesystem/LocalPinotFSTest.java | 23 ++++++++++++++++++
.../pinot/filesystem/PinotFSFactoryTest.java | 5 ++++
.../org/apache/pinot/filesystem/HadoopPinotFS.java | 28 ++++++++++++++++------
7 files changed, 82 insertions(+), 9 deletions(-)
diff --git
a/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
b/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
index fd25bd0..9a5f9fa 100644
---
a/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
+++
b/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
@@ -36,6 +36,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.io.FileUtils;
@@ -234,4 +235,14 @@ public class AzurePinotFS extends PinotFS {
throw new RuntimeException(e);
}
}
+
+ @Override
+ public boolean touch(URI uri) throws IOException {
+ if (!exists(uri)) {
+ _adlStoreClient.createEmptyFile(uri.getPath());
+ } else {
+ _adlStoreClient.setTimes(uri.getPath(), null, new Date());
+ }
+ return true;
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index f696da2..a01341b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -185,6 +185,9 @@ public class SegmentDeletionManager {
if (pinotFS.exists(fileToMoveURI)) {
// Overwrites the file if it already exists in the target directory.
pinotFS.move(fileToMoveURI, deletedSegmentDestURI, true);
+ // Updates last modified.
+ // Touch is needed here so that removeAgedDeletedSegments() works
correctly.
+ pinotFS.touch(deletedSegmentDestURI);
LOGGER.info("Moved segment {} from {} to {}", segmentId,
fileToMoveURI.toString(), deletedSegmentDestURI.toString());
} else {
if (!SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
@@ -248,7 +251,7 @@ public class SegmentDeletionManager {
}
}
} catch (IOException e) {
- LOGGER.error("Had trouble deleting directories",
deletedDirURI.toString());
+ LOGGER.error("Had trouble deleting directories: {}",
deletedDirURI.toString(), e.toString());
}
} else {
LOGGER.info("dataDir is not configured, won't delete any expired
segments from deleted directory.");
diff --git
a/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
b/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
index 987125f..e060d88 100644
---
a/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
+++
b/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
@@ -161,6 +161,15 @@ public class LocalPinotFS extends PinotFS {
return file.lastModified();
}
+ @Override
+ public boolean touch(URI uri) throws IOException {
+ File file = new File(decodeURI(uri.getRawPath()));
+ if (!exists(uri)) {
+ return file.createNewFile();
+ }
+ return file.setLastModified(System.currentTimeMillis());
+ }
+
private String encodeURI(String uri) {
String encodedStr;
try {
diff --git
a/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/PinotFS.java
b/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/PinotFS.java
index abcf494..2a13c77 100644
--- a/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/PinotFS.java
+++ b/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/PinotFS.java
@@ -137,7 +137,7 @@ public abstract class PinotFS implements Closeable {
/**
* Returns the age of the file
- * @param uri
+ * @param uri location of file or directory
* @return A long value representing the time the file was last modified,
measured in milliseconds since epoch
* (00:00:00 GMT, January 1, 1970) or 0L if the file does not exist or if an
I/O error occurs
* @throws Exception if uri is not valid or present
@@ -145,6 +145,14 @@ public abstract class PinotFS implements Closeable {
public abstract long lastModified(URI uri);
/**
+ * Updates the last modified time of an existing file or directory to be
current time. If the file system object
+ * does not exist, creates an empty file.
+ * @param uri location of file or directory
+ * @throws IOException if the parent directory doesn't exist.
+ */
+ public abstract boolean touch(URI uri) throws IOException;
+
+ /**
* For certain filesystems, we may need to close the filesystem and do
relevant operations to prevent leaks.
* By default, this method does nothing.
* @throws IOException
diff --git
a/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/LocalPinotFSTest.java
b/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/LocalPinotFSTest.java
index 6bdf32c..8b26b62 100644
---
a/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/LocalPinotFSTest.java
+++
b/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/LocalPinotFSTest.java
@@ -170,6 +170,29 @@ public class LocalPinotFSTest {
localPinotFS.mkdir(firstTempDir.toURI());
Assert.assertTrue(firstTempDir.exists(), "Could not make directory " +
firstTempDir.getPath());
+ // Check that touching a file works
+ File nonExistingFile = new File(_absoluteTmpDirPath, "nonExistingFile");
+ Assert.assertFalse(nonExistingFile.exists());
+ localPinotFS.touch(nonExistingFile.toURI());
+ Assert.assertTrue(nonExistingFile.exists());
+ long currentTime = System.currentTimeMillis();
+ Assert.assertTrue(localPinotFS.lastModified(nonExistingFile.toURI()) <
currentTime);
+ Thread.sleep(1000L);
+ // update last modified.
+ localPinotFS.touch(nonExistingFile.toURI());
+ Assert.assertTrue(localPinotFS.lastModified(nonExistingFile.toURI()) >
currentTime);
+ FileUtils.deleteQuietly(nonExistingFile);
+
+ // Check that touch an file in a directory that doesn't exist should throw
an exception.
+ File nonExistingFileUnderNonExistingDir = new File(_absoluteTmpDirPath,
"nonExistingDir/nonExistingFile");
+ Assert.assertFalse(nonExistingFileUnderNonExistingDir.exists());
+ try {
+ localPinotFS.touch(nonExistingFileUnderNonExistingDir.toURI());
+ Assert.fail("Touch method should throw an IOException");
+ } catch (IOException e) {
+ // Expected.
+ }
+
// Check that directory only copy worked
localPinotFS.copy(firstTempDir.toURI(), secondTempDir.toURI());
Assert.assertTrue(localPinotFS.exists(secondTempDir.toURI()));
diff --git
a/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
b/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
index 131c4b6..a0b6f16 100644
---
a/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
+++
b/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
@@ -124,5 +124,10 @@ public class PinotFSFactoryTest {
public long lastModified(URI uri) {
return 0L;
}
+
+ @Override
+ public boolean touch(URI uri) throws IOException {
+ return true;
+ }
}
}
diff --git
a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
b/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
index 97aae85..9a88114 100644
---
a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
+++
b/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
@@ -80,9 +81,7 @@ public class HadoopPinotFS extends PinotFS {
@Override
public boolean delete(URI segmentUri, boolean forceDelete) throws
IOException {
// Returns false if we are moving a directory and that directory is not
empty
- if (isDirectory(segmentUri)
- && listFiles(segmentUri, false).length > 0
- && !forceDelete) {
+ if (isDirectory(segmentUri) && listFiles(segmentUri, false).length > 0 &&
!forceDelete) {
return false;
}
return _hadoopFS.delete(new Path(segmentUri), true);
@@ -107,7 +106,8 @@ public class HadoopPinotFS extends PinotFS {
RemoteIterator<LocatedFileStatus> sourceFiles =
_hadoopFS.listFiles(source, true);
if (sourceFiles != null) {
while (sourceFiles.hasNext()) {
- boolean succeeded = FileUtil.copy(_hadoopFS,
sourceFiles.next().getPath(), _hadoopFS, target, true, _hadoopConf);
+ boolean succeeded =
+ FileUtil.copy(_hadoopFS, sourceFiles.next().getPath(), _hadoopFS,
target, true, _hadoopConf);
if (!succeeded) {
return false;
}
@@ -196,15 +196,29 @@ public class HadoopPinotFS extends PinotFS {
}
}
- private void authenticate(org.apache.hadoop.conf.Configuration hadoopConf,
org.apache.commons.configuration.Configuration configs) {
+ @Override
+ public boolean touch(URI uri) throws IOException {
+ Path path = new Path(uri);
+ if (!exists(uri)) {
+ FSDataOutputStream fos = _hadoopFS.create(path);
+ fos.close();
+ } else {
+ _hadoopFS.setTimes(path, System.currentTimeMillis(), -1);
+ }
+ return true;
+ }
+
+ private void authenticate(org.apache.hadoop.conf.Configuration hadoopConf,
+ org.apache.commons.configuration.Configuration configs) {
String principal = configs.getString(PRINCIPAL);
String keytab = configs.getString(KEYTAB);
if (!Strings.isNullOrEmpty(principal) && !Strings.isNullOrEmpty(keytab)) {
UserGroupInformation.setConfiguration(hadoopConf);
if (UserGroupInformation.isSecurityEnabled()) {
try {
- if (!UserGroupInformation.getCurrentUser().hasKerberosCredentials()
- ||
!UserGroupInformation.getCurrentUser().getUserName().equals(principal)) {
+ if (!UserGroupInformation.getCurrentUser().hasKerberosCredentials()
|| !UserGroupInformation.getCurrentUser()
+ .getUserName()
+ .equals(principal)) {
LOGGER.info("Trying to authenticate user [%s] with keytab [%s]..",
principal, keytab);
UserGroupInformation.loginUserFromKeytab(principal, keytab);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]