mcvsubbu commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r549156061



##########
File path: 
pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -36,15 +57,23 @@
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class SegmentOp extends BaseOp {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentOp.class);
+  private static final FileFormat DEFAULT_FILE_FORMAT = FileFormat.CSV;
+  private static final String STATE_ONLINE = "ONLINE";
+  private static final int DEFAULT_MAX_SLEEP_TIME_MS = 30000;
+  private static final int DEFAULT_WAIT_TIME_MS = 5000;

Review comment:
       ```suggestion
     private static final int DEFAULT_SLEEP_INTERVAL_MS = 200;
   ```

##########
File path: 
pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,172 @@ public void setTableConfigFileName(String 
tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) 
{
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + 
_inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to 
controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), 
"pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      long startTime = System.currentTimeMillis();
+      while (getOnlineSegmentCount() <= 0) {
+        if ((System.currentTimeMillis() - startTime) > 
DEFAULT_MAX_SLEEP_TIME_MS) {
+          LOGGER.error("Upload segment verification failed, count is zero 
after max wait time {} ms.",
+              DEFAULT_MAX_SLEEP_TIME_MS);
+          return false;
+        }
+        LOGGER.warn("Upload segment verification count is zero, will retry 
after {} ms.", DEFAULT_WAIT_TIME_MS);
+        Thread.sleep(DEFAULT_WAIT_TIME_MS);
+      }
+      LOGGER.info("Successfully verified segment {} and its current status is 
{}.", _segmentName, STATE_ONLINE);
+
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file 
{}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports 
generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to 
TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new 
File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), 
Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, 
_recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", 
_segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + 
TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new 
URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new 
FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, 
segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new 
File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      
ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+
+      long startTime = System.currentTimeMillis();
+      while (getOnlineSegmentCount() > 0) {

Review comment:
       Valid question.
   So, assuming that we create a table and add a segment each between phases of 
upgrades (there are six) and delete some of them in some phases. So, we will 
start with 0 segments, and maybe end with 2 or 3, while going up to 6.
   
   In this case, a count of online segments can be anythning.
   
   So,  all we want to make sure is that the given segment (segment name) is in 
the state we want it to be. If deleted, then we want to make sure that it 
disappeared from externalview.
   
   A segment is deleted when it goes away from externalview. If it is present, 
it better not be in ERROR state (unless we intend that). For now, let us just 
implement plain old ADD and DELETE operations and check for ONLINE state in the 
case  of adding a segment, and for not being there in the case of a delete.
   
   So, the best way seems to be to get the externalview, parse it into a json 
object, and look for specific fields.
   
   And while we are there, might as well account for all replicas being in the 
same state rather than just one replica. If we add test cases involving 
replicas this will be one less thing to take care of.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to