This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9c022e3 [Bug] Spark doris connector http v2 authentication fails, and
HTTP v2 interface returns json nesting problem (#5366)
9c022e3 is described below
commit 9c022e37648e75f9b46bd59bbfeaf1f190e3fd96
Author: 张家锋 <[email protected]>
AuthorDate: Sun Feb 7 09:28:55 2021 +0800
[Bug] Spark doris connector http v2 authentication fails, and HTTP v2
interface returns json nesting problem (#5366)
1. Deal with the problem of inconsistent data format returned by http v1
and v2
2. Deal with user authentication failure
---
.../org/apache/doris/spark/rest/RestService.java | 67 +++++++++++++++++-----
.../doris/spark/sql/SparkDorisConnector.scala | 27 +++++++++
2 files changed, 79 insertions(+), 15 deletions(-)
diff --git
a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
index 2404507..3c8249c 100644
---
a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
+++
b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -32,17 +32,23 @@ import static
org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESS
import static
org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.URL;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Base64;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
+import java.util.ArrayList;
import java.util.Set;
+import java.util.HashSet;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.Settings;
@@ -113,32 +119,36 @@ public class RestService implements Serializable {
String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, "");
String password = cfg.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");
- CredentialsProvider credentialsProvider = new
BasicCredentialsProvider();
- credentialsProvider.setCredentials(
- AuthScope.ANY,
- new UsernamePasswordCredentials(user, password));
- HttpClientContext context = HttpClientContext.create();
- context.setCredentialsProvider(credentialsProvider);
logger.info("Send request to Doris FE '{}' with user '{}'.",
request.getURI(), user);
IOException ex = null;
int statusCode = -1;
for (int attempt = 0; attempt < retries; attempt++) {
- CloseableHttpClient httpClient = HttpClients.createDefault();
logger.debug("Attempt {} to request {}.", attempt,
request.getURI());
try {
- CloseableHttpResponse response = httpClient.execute(request,
context);
- statusCode = response.getStatusLine().getStatusCode();
+ HttpURLConnection conn = getConnection(request, user,
password);
+ statusCode = conn.getResponseCode();
if (statusCode != HttpStatus.SC_OK) {
logger.warn("Failed to get response from Doris FE {}, http
code is {}",
request.getURI(), statusCode);
continue;
}
- String res = EntityUtils.toString(response.getEntity(),
StandardCharsets.UTF_8);
+ InputStream stream = (InputStream) conn.getContent();
+ String res = IOUtils.toString(stream);
logger.trace("Success get response from Doris FE: {}, response
is: {}.",
request.getURI(), res);
- return res;
+
+ ObjectMapper mapper = new ObjectMapper();
+
+ Map map = mapper.readValue(res, Map.class);
+ //Handle the problem of inconsistent data format returned by
http v1 and v2
+ if(map.containsKey("code") && map.containsKey("msg")) {
+ Object data = map.get("data");
+ return mapper.writeValueAsString(data);
+ } else {
+ return res;
+ }
} catch (IOException e) {
ex = e;
logger.warn(CONNECT_FAILED_MESSAGE, request.getURI(), e);
@@ -149,6 +159,33 @@ public class RestService implements Serializable {
throw new ConnectedFailedException(request.getURI().toString(),
statusCode, ex);
}
+
+ /**
+ * Get http connection
+ * @param request
+ * @param user
+ * @param passwd
+ * @return
+ * @throws IOException
+ */
+ private static HttpURLConnection getConnection(HttpRequestBase request,
String user, String passwd) throws IOException {
+ URL url = new URL(request.getURI().toString());
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setInstanceFollowRedirects(false);
+ conn.setRequestMethod("POST");
+ String authEncoding =
Base64.getEncoder().encodeToString(String.format("%s:%s", user,
passwd).getBytes(StandardCharsets.UTF_8));
+ conn.setRequestProperty("Authorization", "Basic " + authEncoding);
+
+ InputStream content = ((HttpPost) request).getEntity().getContent();
+ String s = IOUtils.toString(content);
+
+ conn.setDoOutput(true);
+ conn.setDoInput(true);
+ PrintWriter out = new PrintWriter(conn.getOutputStream());
+ out.print(s);
+ out.flush();
+ return conn;
+ }
/**
* parse table identifier to array.
* @param tableIdentifier table identifier string
diff --git
a/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SparkDorisConnector.scala
b/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SparkDorisConnector.scala
new file mode 100644
index 0000000..6d299d3
--- /dev/null
+++
b/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/SparkDorisConnector.scala
@@ -0,0 +1,27 @@
+package org.apache.doris.spark.sql
+
+import org.apache.spark.{SparkConf, SparkContext}
+
+
+
+object SparkDorisConnector {
+
+ def main(args: Array[String]): Unit = {
+ val sparkConf: SparkConf = new
SparkConf().setAppName("SparkDorisConnector").setMaster("local[*]")
+ val sc = new SparkContext(sparkConf)
+ sc.setLogLevel("DEBUG")
+ import org.apache.doris.spark._
+ val dorisSparkRDD = sc.dorisRDD(
+ tableIdentifier = Some("db.table1"),
+ cfg = Some(Map(
+ "doris.fenodes" -> "feip:8030",
+ "doris.request.auth.user" -> "root",
+ "doris.request.auth.password" -> ""
+ ))
+ )
+
+ dorisSparkRDD.map(println(_)).count()
+ sc.stop()
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]