This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch tls-integration-test
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit ec47454bbe1b81a0c91a0cc0c48b3ce73bc1e41b
Author: Alexander Pucher <[email protected]>
AuthorDate: Wed Jun 2 14:40:45 2021 -0700

    TLS & basic auth cross-integration test
---
 .../tests/BasicAuthTlsRealtimeIntegrationTest.java | 259 +++++++++++++++++++++
 .../src/test/resources/tlstest.jks                 | Bin 0 -> 4271 bytes
 2 files changed, 259 insertions(+)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthTlsRealtimeIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthTlsRealtimeIntegrationTest.java
new file mode 100644
index 0000000..e41cf16
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BasicAuthTlsRealtimeIntegrationTest.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
+import groovy.lang.IntRange;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.pinot.client.Connection;
+import org.apache.pinot.client.ConnectionFactory;
+import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory;
+import org.apache.pinot.client.Request;
+import org.apache.pinot.client.ResultSetGroup;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static 
org.apache.pinot.integration.tests.BasicAuthTestUtils.AUTH_HEADER;
+
+
+public class BasicAuthTlsRealtimeIntegrationTest extends 
BaseClusterIntegrationTest {
+  private final File _tempDirTls = new File(FileUtils.getTempDirectory(), 
getClass().getSimpleName() + "-cert");
+  private final File _tlsStore = 
_tempDirTls.toPath().resolve("tlsstore.jks").toFile();
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDirTls);
+
+    prepareTlsStore();
+
+    // Start Zookeeper
+    startZk();
+    // Start Pinot cluster
+    startKafka();
+    startController();
+    startBrokerHttps();
+    startServerHttps();
+    startMinion(null, null);
+
+    // Unpack the Avro files
+    List<File> avroFiles = unpackAvroData(_tempDir);
+
+    // Create and upload the schema and table config
+    addSchema(createSchema());
+    addTableConfig(createRealtimeTableConfig(avroFiles.get(0)));
+    addTableConfig(createOfflineTableConfig());
+
+    // Push data into Kafka
+    pushAvroIntoKafka(avroFiles);
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  @AfterClass(alwaysRun = true)
+  public void tearDown()
+      throws Exception {
+    dropRealtimeTable(getTableName());
+    stopMinion();
+    stopServer();
+    stopBroker();
+    stopController();
+    stopKafka();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+    FileUtils.deleteDirectory(_tempDirTls);
+  }
+
+  @Override
+  public Map<String, Object> getDefaultControllerConfiguration() {
+    Map<String, Object> prop = super.getDefaultControllerConfiguration();
+    prop.put("controller.tls.keystore.path", _tlsStore.getAbsolutePath());
+    prop.put("controller.tls.keystore.password", "changeit");
+    prop.put("controller.tls.truststore.path", _tlsStore.getAbsolutePath());
+    prop.put("controller.tls.truststore.password", "changeit");
+
+    prop.remove("controller.port");
+    prop.put("controller.access.protocols", "https");
+    prop.put("controller.access.protocols.https.port", 
DEFAULT_CONTROLLER_PORT);
+    prop.put("controller.broker.protocol", "https");
+    prop.put("controller.vip.protocol", "https");
+    prop.put("controller.vip.port", DEFAULT_CONTROLLER_PORT);
+
+    return BasicAuthTestUtils.addControllerConfiguration(prop);
+  }
+
+  @Override
+  protected PinotConfiguration getDefaultBrokerConfiguration() {
+    Map<String, Object> prop = super.getDefaultBrokerConfiguration().toMap();
+    prop.put("pinot.broker.tls.keystore.path", _tlsStore.getAbsolutePath());
+    prop.put("pinot.broker.tls.keystore.password", "changeit");
+    prop.put("pinot.broker.tls.truststore.path", _tlsStore.getAbsolutePath());
+    prop.put("pinot.broker.tls.truststore.password", "changeit");
+
+    prop.put("pinot.broker.client.access.protocols", "https");
+    prop.put("pinot.broker.client.access.protocols.https.port", 
DEFAULT_BROKER_PORT);
+    prop.put("pinot.broker.nettytls.enabled", "true");
+
+    return BasicAuthTestUtils.addBrokerConfiguration(prop);
+  }
+
+  @Override
+  protected PinotConfiguration getDefaultServerConfiguration() {
+    Map<String, Object> prop = super.getDefaultServerConfiguration().toMap();
+    prop.put("pinot.server.tls.keystore.path", _tlsStore.getAbsolutePath());
+    prop.put("pinot.server.tls.keystore.password", "changeit");
+    prop.put("pinot.server.tls.truststore.path", _tlsStore.getAbsolutePath());
+    prop.put("pinot.server.tls.truststore.password", "changeit");
+
+    prop.put("pinot.server.adminapi.access.protocols", "https");
+    prop.put("pinot.server.adminapi.access.protocols.https.port", "7443");
+    prop.put("pinot.server.netty.enabled", "false");
+    prop.put("pinot.server.nettytls.enabled", "true");
+    prop.put("pinot.server.nettytls.port", "8089");
+    prop.put("pinot.server.segment.uploader.protocol", "https");
+
+    return BasicAuthTestUtils.addServerConfiguration(prop);
+  }
+
+  @Override
+  protected PinotConfiguration getDefaultMinionConfiguration() {
+    Map<String, Object> prop = super.getDefaultMinionConfiguration().toMap();
+    prop.put("pinot.minion.tls.keystore.path", _tlsStore.getAbsolutePath());
+    prop.put("pinot.minion.tls.keystore.password", "changeit");
+    prop.put("pinot.minion.tls.truststore.path", _tlsStore.getAbsolutePath());
+    prop.put("pinot.minion.tls.truststore.password", "changeit");
+
+    return BasicAuthTestUtils.addMinionConfiguration(prop);
+  }
+
+  @Override
+  protected TableTaskConfig getTaskConfig() {
+    Map<String, String> prop = new HashMap<>();
+    prop.put("bucketTimePeriod", "30d");
+
+    return new 
TableTaskConfig(Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
 prop));
+  }
+
+  @Override
+  protected boolean useLlc() {
+    return true;
+  }
+
+  @Override
+  protected void addSchema(Schema schema)
+      throws IOException {
+    PostMethod response =
+        
sendMultipartPostRequest(_controllerRequestURLBuilder.forSchemaCreate(), 
schema.toSingleLineJsonString(),
+            AUTH_HEADER);
+    Assert.assertEquals(response.getStatusCode(), 200);
+  }
+
+  @Override
+  protected void addTableConfig(TableConfig tableConfig)
+      throws IOException {
+    sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), 
tableConfig.toJsonString(), AUTH_HEADER);
+  }
+
+  @Override
+  protected Connection getPinotConnection() {
+    if (_pinotConnection == null) {
+      JsonAsyncHttpPinotClientTransportFactory factory = new 
JsonAsyncHttpPinotClientTransportFactory();
+      factory.setHeaders(AUTH_HEADER);
+      factory.setScheme("https");
+      factory.setSslContext(FileUploadDownloadClient._defaultSSLContext);
+
+      _pinotConnection = ConnectionFactory.fromZookeeper(getZkUrl() + "/" + 
getHelixClusterName(), factory.buildTransport());
+    }
+    return _pinotConnection;
+  }
+
+  @Override
+  protected void dropRealtimeTable(String tableName)
+      throws IOException {
+    sendDeleteRequest(
+        
_controllerRequestURLBuilder.forTableDelete(TableNameBuilder.REALTIME.tableNameWithType(tableName)),
+        AUTH_HEADER);
+  }
+
+  @Test
+  public void testSegmentUploadDownload()
+      throws Exception {
+    final Request query = new Request("sql", "SELECT count(*) FROM " + 
getTableName());
+
+    ResultSetGroup resultBeforeOffline = getPinotConnection().execute(query);
+    Assert.assertTrue(resultBeforeOffline.getResultSet(0).getLong(0) > 0);
+
+    // schedule offline segment generation
+    Assert.assertNotNull(_controllerStarter.getTaskManager().scheduleTasks());
+    Thread.sleep(5000);
+
+    ResultSetGroup resultAfterOffline = getPinotConnection().execute(query);
+
+    // Verify constant row count
+    Assert.assertEquals(resultBeforeOffline.getResultSet(0).getLong(0), 
resultAfterOffline.getResultSet(0).getLong(0));
+
+    // list offline segments
+    JsonNode segmentSets = JsonUtils
+        
.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forSegmentListAPI(getTableName()),
 AUTH_HEADER));
+    JsonNode offlineSegments =
+        new IntRange(0, 
segmentSets.size()).stream().map(segmentSets::get).filter(s -> s.has("OFFLINE"))
+            .map(s -> s.get("OFFLINE")).findFirst().get();
+    Assert.assertFalse(offlineSegments.isEmpty());
+
+    // download and sanity-check size of offline segment(s)
+    for (int i = 0; i < offlineSegments.size(); i++) {
+      String segment = offlineSegments.get(i).asText();
+      Assert.assertTrue(sendGetRequest(_controllerRequestURLBuilder
+          
.forSegmentDownload(TableNameBuilder.OFFLINE.tableNameWithType(getTableName()), 
segment), AUTH_HEADER)
+          .length() > 200000); // download segment
+    }
+  }
+
+  void prepareTlsStore()
+      throws Exception {
+    try (OutputStream os = new FileOutputStream(_tlsStore);
+        InputStream is = getClass().getResourceAsStream("/tlstest.jks")) {
+      Preconditions.checkNotNull(is, "tlstest.jks must be on the classpath");
+      IOUtils.copy(is, os);
+    }
+  }
+}
diff --git a/pinot-integration-tests/src/test/resources/tlstest.jks 
b/pinot-integration-tests/src/test/resources/tlstest.jks
new file mode 100644
index 0000000..9609d94
Binary files /dev/null and 
b/pinot-integration-tests/src/test/resources/tlstest.jks differ

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to