This is an automated email from the ASF dual-hosted git repository.
kbeedkar pushed a commit to branch rel/0.7.1
in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
The following commit(s) were added to refs/heads/rel/0.7.1 by this push:
new 3675bdbd Modified the JavaTextFileSource so that a remot file can be
loaded via HTTPS protocol (or HTTP)
new 654848eb Merge pull request #348 from kamir/rel/0.7.1
3675bdbd is described below
commit 3675bdbd7034fc004b7c3253802fc3a77726f0c7
Author: Mirko Kämpf <[email protected]>
AuthorDate: Tue Oct 10 13:31:43 2023 +0200
Modified the JavaTextFileSource so that a remot file can be loaded via
HTTPS protocol (or HTTP)
---
.../wayang/java/operators/JavaTextFileSource.java | 61 ++++++--
.../java/operators/JavaTextFileSourceTest.java | 156 +++++++++++++++++++++
.../src/test/resources/banking-tx-small.csv | 63 +++++++++
3 files changed, 271 insertions(+), 9 deletions(-)
diff --git
a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTextFileSource.java
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTextFileSource.java
index 6d8577f6..fad925a1 100644
---
a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTextFileSource.java
+++
b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTextFileSource.java
@@ -35,11 +35,17 @@ import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.net.*;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.stream.Stream;
/**
* This is execution operator implements the {@link TextFileSource}.
@@ -65,20 +71,57 @@ public class JavaTextFileSource extends TextFileSource
implements JavaExecutionO
ChannelInstance[] outputs,
JavaExecutor javaExecutor,
OptimizationContext.OperatorContext operatorContext) {
+
assert inputs.length == this.getNumInputs();
assert outputs.length == this.getNumOutputs();
- String url = this.getInputUrl().trim();
- FileSystem fs = FileSystems.getFileSystem(url).orElseThrow(
- () -> new WayangException(String.format("Cannot access file
system of %s.", url))
- );
+ String urlStr = this.getInputUrl().trim();
try {
- final InputStream inputStream = fs.open(url);
- Stream<String> lines = new BufferedReader(new
InputStreamReader(inputStream)).lines();
- ((StreamChannel.Instance) outputs[0]).accept(lines);
- } catch (IOException e) {
- throw new WayangException(String.format("Reading %s failed.",
url), e);
+
+ URL sourceUrl = new URL( urlStr );
+ String protocol = sourceUrl.getProtocol();
+
+ if ( protocol.startsWith("https") || protocol.startsWith("http")
) {
+ try {
+ HttpURLConnection connection = (HttpURLConnection)
sourceUrl.openConnection();
+ connection.setRequestMethod("GET");
+ // Check if the response code indicates success (HTTP
status code 200)
+ if (connection.getResponseCode() ==
HttpURLConnection.HTTP_OK) {
+ System.out.println(">>> Ready to stream the data from
URL: " + urlStr);
+ // Read the data line by line and process it in the
StreamChannel
+ Stream<String> lines2 = new BufferedReader(new
InputStreamReader(connection.getInputStream())).lines();
+ ((StreamChannel.Instance) outputs[0]).accept(lines2);
+ }
+ }
+ catch (IOException ioException) {
+ ioException.printStackTrace();
+ throw new WayangException(String.format("Reading from URL:
%s failed.", urlStr), ioException);
+ }
+ }
+ else if ( sourceUrl.getProtocol().startsWith("file") ) {
+
+ FileSystem fs = FileSystems.getFileSystem(urlStr).orElseThrow(
+ () -> new WayangException(String.format("Cannot access
file system of %s. ", urlStr))
+ );
+
+ try {
+ final InputStream inputStream = fs.open(urlStr);
+ Stream<String> lines = new BufferedReader(new
InputStreamReader(inputStream)).lines();
+ ((StreamChannel.Instance) outputs[0]).accept(lines);
+ }
+ catch (IOException ioException) {
+ ioException.printStackTrace();
+ throw new WayangException(String.format("Reading from
FILE: %s failed.", urlStr), ioException);
+ }
+ }
+ else {
+ throw new WayangException(String.format("PROTOCOL NOT
SUPPORTED IN JavaTextFileSource. [%s] [%s] SUPPORTED ARE: (file|http|https)",
urlStr, protocol));
+ }
+ }
+ catch (MalformedURLException e) {
+ e.printStackTrace();
+ throw new WayangException(String.format("Provided URL is not a
valid URL: %s (MalformedURLException)", urlStr), e);
}
ExecutionLineageNode prepareLineageNode = new
ExecutionLineageNode(operatorContext);
diff --git
a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSourceTest.java
b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSourceTest.java
new file mode 100644
index 00000000..304a209b
--- /dev/null
+++
b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSourceTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.java.operators;
+
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.api.Job;
+import org.apache.wayang.core.function.TransformationDescriptor;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.plan.wayangplan.OutputSlot;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.fs.LocalFileSystem;
+import org.apache.wayang.java.channels.JavaChannelInstance;
+import org.apache.wayang.java.channels.StreamChannel;
+import org.apache.wayang.java.execution.JavaExecutor;
+import org.apache.wayang.java.platform.JavaPlatform;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test suite for {@link JavaTextFileSource}.
+ */
+public class JavaTextFileSourceTest extends JavaExecutionOperatorTestBase {
+
+ private Locale defaultLocale;
+
+ /**
+ * In locales, where the decimal separator is not "." this rest would fail.
+ * Therefore we ensure it's run in a pre-defined locale and we make sure
it's
+ * reset after the test.
+ */
+ @Before
+ public void setupTest() {
+ defaultLocale = Locale.getDefault();
+ Locale.setDefault(Locale.US);
+ }
+
+ @After
+ public void teardownTest() {
+ Locale.setDefault(defaultLocale);
+ }
+
+ @Test
+ public void testReadLocalFile() throws IOException, URISyntaxException {
+ final String testFileName = "/banking-tx-small.csv";
+
+ JavaExecutor javaExecutor = null;
+ try {
+ // Prepare the source.
+ final URL inputUrl = this.getClass().getResource(testFileName);
+ System.out.println( "*** " + inputUrl + "***");
+ JavaTextFileSource source = new JavaTextFileSource(
+ inputUrl.toString() );
+
+ // Execute.
+ JavaChannelInstance[] inputs = new JavaChannelInstance[]{};
+ JavaChannelInstance[] outputs = new
JavaChannelInstance[]{createStreamChannelInstance()};
+ evaluate(source, inputs, outputs);
+
+ // Verify the outcome.
+ final List<String> result =
outputs[0].<String>provideStream().collect(Collectors.toList());
+ Assert.assertEquals(63, result.size());
+ } finally {
+ if (javaExecutor != null) javaExecutor.dispose();
+ }
+ }
+
+ @Test
+ /**
+ * Requires a local HTTP Server running, in the project root ...
+ *
+ * With Python 3: python -m http.server
+ * With Python 2: python -m SimpleHTTPServer
+ */
+ public void testReadRemoteFileHTTP() throws IOException,
URISyntaxException {
+ final String testFileURL = "http://localhost:8000/LICENSE";
+
+ JavaExecutor javaExecutor = null;
+ try {
+ // Prepare the source.
+ final URL inputUrl = new URL(testFileURL);
+ System.out.println( "*** " + inputUrl + "***");
+ JavaTextFileSource source = new JavaTextFileSource(
+ inputUrl.toString() );
+
+ // Execute.
+ JavaChannelInstance[] inputs = new JavaChannelInstance[]{};
+ JavaChannelInstance[] outputs = new
JavaChannelInstance[]{createStreamChannelInstance()};
+ evaluate(source, inputs, outputs);
+
+ // Verify the outcome.
+ final List<String> result =
outputs[0].<String>provideStream().collect(Collectors.toList());
+ Assert.assertEquals(225, result.size());
+ } finally {
+ if (javaExecutor != null) javaExecutor.dispose();
+ }
+ }
+
+ @Test
+ public void testReadRemoteFileHTTPS() throws IOException,
URISyntaxException {
+ final String testFileURL =
"https://kamir.solidcommunity.net/public/ecolytiq-sustainability-profile/profile2.ttl";
+
+ JavaExecutor javaExecutor = null;
+ try {
+ // Prepare the source.
+ final URL inputUrl = new URL(testFileURL);
+ System.out.println( "*** " + inputUrl + "***");
+ JavaTextFileSource source = new JavaTextFileSource(
+ inputUrl.toString() );
+
+ // Execute.
+ JavaChannelInstance[] inputs = new JavaChannelInstance[]{};
+ JavaChannelInstance[] outputs = new
JavaChannelInstance[]{createStreamChannelInstance()};
+ evaluate(source, inputs, outputs);
+
+ // Verify the outcome.
+ final List<String> result =
outputs[0].<String>provideStream().collect(Collectors.toList());
+ Assert.assertEquals(23, result.size());
+ } finally {
+ if (javaExecutor != null) javaExecutor.dispose();
+ }
+
+ }
+}
diff --git
a/wayang-platforms/wayang-java/src/test/resources/banking-tx-small.csv
b/wayang-platforms/wayang-java/src/test/resources/banking-tx-small.csv
new file mode 100644
index 00000000..7b3757dd
--- /dev/null
+++ b/wayang-platforms/wayang-java/src/test/resources/banking-tx-small.csv
@@ -0,0 +1,63 @@
+Account ID,Date,Transaction Type,Description,Amount,Currency
+Account123,2023-08-01,Deposit,Salary Deposit,5000.00,USD
+Account123,2023-08-02,Withdrawal,ATM Withdrawal,100.00,USD
+Account123,2023-08-03,Transfer,Transfer to Savings,1000.00,USD
+Account123,2023-08-04,Payment,Electric Bill Payment,75.00,USD
+Account123,2023-08-05,Deposit,Dividend Payment,200.00,USD
+Account123,2023-08-06,Withdrawal,ATM Withdrawal,50.00,USD
+Account123,2023-08-07,Transfer,Transfer to Checking,300.00,USD
+Account123,2023-08-08,Payment,Internet Bill Payment,50.00,USD
+Account123,2023-08-09,Deposit,Bonus Deposit,1000.00,USD
+Account123,2023-08-10,Withdrawal,ATM Withdrawal,200.00,USD
+Account123,2023-08-11,Transfer,Transfer to Savings,500.00,USD
+Account123,2023-08-12,Payment,Gas Bill Payment,60.00,USD
+Account123,2023-08-13,Deposit,Interest Payment,150.00,USD
+Account123,2023-08-14,Withdrawal,ATM Withdrawal,75.00,USD
+Account123,2023-08-15,Transfer,Transfer to Checking,200.00,USD
+Account123,2023-08-16,Payment,Phone Bill Payment,40.00,USD
+Account123,2023-08-17,Deposit,Salary Deposit,5500.00,USD
+Account123,2023-08-18,Withdrawal,ATM Withdrawal,150.00,USD
+Account123,2023-08-19,Transfer,Transfer to Savings,1000.00,USD
+Account123,2023-08-20,Payment,Water Bill Payment,45.00,USD
+Account123,2023-08-21,Deposit,Dividend Payment,250.00,USD
+Account123,2023-08-22,Withdrawal,ATM Withdrawal,75.00,USD
+Account123,2023-08-23,Transfer,Transfer to Checking,400.00,USD
+Account123,2023-08-24,Payment,Electric Bill Payment,80.00,USD
+Account123,2023-08-25,Deposit,Bonus Deposit,1200.00,USD
+Account123,2023-08-26,Withdrawal,ATM Withdrawal,100.00,USD
+Account123,2023-08-27,Transfer,Transfer to Savings,700.00,USD
+Account123,2023-08-28,Payment,Internet Bill Payment,60.00,USD
+Account123,2023-08-29,Deposit,Interest Payment,180.00,USD
+Account123,2023-08-30,Withdrawal,ATM Withdrawal,50.00,USD
+Account123,2023-08-31,Transfer,Transfer to Checking,350.00,USD
+Account456,2023-08-01,Deposit,Salary Deposit,5500.00,USD
+Account456,2023-08-02,Withdrawal,ATM Withdrawal,200.00,USD
+Account456,2023-08-03,Transfer,Transfer to Savings,1500.00,USD
+Account456,2023-08-04,Payment,Electric Bill Payment,120.00,USD
+Account456,2023-08-05,Deposit,Dividend Payment,300.00,USD
+Account456,2023-08-06,Withdrawal,ATM Withdrawal,75.00,USD
+Account456,2023-08-07,Transfer,Transfer to Checking,400.00,USD
+Account456,2023-08-08,Payment,Internet Bill Payment,60.00,USD
+Account456,2023-08-09,Deposit,Bonus Deposit,1500.00,USD
+Account456,2023-08-10,Withdrawal,ATM Withdrawal,300.00,USD
+Account456,2023-08-11,Transfer,Transfer to Savings,1000.00,USD
+Account456,2023-08-12,Payment,Gas Bill Payment,80.00,USD
+Account456,2023-08-13,Deposit,Interest Payment,200.00,USD
+Account456,2023-08-14,Withdrawal,ATM Withdrawal,100.00,USD
+Account456,2023-08-15,Transfer,Transfer to Checking,500.00,USD
+Account456,2023-08-16,Payment,Phone Bill Payment,40.00,USD
+Account456,2023-08-17,Deposit,Salary Deposit,6000.00,USD
+Account456,2023-08-18,Withdrawal,ATM Withdrawal,250.00,USD
+Account456,2023-08-19,Transfer,Transfer to Savings,1200.00,USD
+Account456,2023-08-20,Payment,Water Bill Payment,60.00,USD
+Account456,2023-08-21,Deposit,Dividend Payment,350.00,USD
+Account456,2023-08-22,Withdrawal,ATM Withdrawal,100.00,USD
+Account456,2023-08-23,Transfer,Transfer to Checking,600.00,USD
+Account456,2023-08-24,Payment,Electric Bill Payment,100.00,USD
+Account456,2023-08-25,Deposit,Bonus Deposit,1800.00,USD
+Account456,2023-08-26,Withdrawal,ATM Withdrawal,150.00,USD
+Account456,2023-08-27,Transfer,Transfer to Savings,800.00,USD
+Account456,2023-08-28,Payment,Internet Bill Payment,80.00,USD
+Account456,2023-08-29,Deposit,Interest Payment,250.00,USD
+Account456,2023-08-30,Withdrawal,ATM Withdrawal,60.00,USD
+Account456,2023-08-31,Transfer,Transfer to Checking,700.00,USD
\ No newline at end of file