This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 77b48cd17f661b60c3d83be36a4eb6999ccce30d Merge: dc43ac8 c6b2941 Author: Michael Blow <[email protected]> AuthorDate: Sat Jan 11 13:12:33 2020 -0500 Merge branch 'gerrit/mad-hatter' Change-Id: If1c03edce783ccd249d90383da938132ae654886 .../apache/asterix/test/common/TestExecutor.java | 18 +++++--- .../asterix/test/sqlpp/ParserTestExecutor.java | 13 +++--- .../test/resources/runtimets/testsuite_sqlpp.xml | 1 + .../apache/hyracks/api/comm/NetworkAddress.java | 20 ++++++++- .../hyracks/api/context/IHyracksJobletContext.java | 2 +- .../hyracks/control/cc/cluster/NodeManager.java | 8 ++-- .../hyracks/control/cc/executor/JobExecutor.java | 2 +- .../control/cc/partitions/PartitionUtils.java | 2 +- .../hyracks/control/cc/work/RegisterNodeWork.java | 2 +- .../control/cc/cluster/NodeManagerTest.java | 6 +-- .../hyracks/control/common/NodeControllerData.java | 24 +++++------ .../common/controllers/NodeRegistration.java | 40 +++++++++--------- .../java/org/apache/hyracks/control/nc/Joblet.java | 18 +++++--- .../hyracks/control/nc/NodeControllerService.java | 8 ++-- .../java/org/apache/hyracks/control/nc/Task.java | 6 +-- .../dataflow/std/join/InMemoryHashJoin.java | 43 ++++++++++--------- .../join/InMemoryHashJoinOperatorDescriptor.java | 49 +++++++++------------- .../hyracks/dataflow/std/join/NestedLoopJoin.java | 45 ++++++++++---------- .../std/join/NestedLoopJoinOperatorDescriptor.java | 15 ++++--- .../dataflow/std/join/OptimizedHybridHashJoin.java | 43 ++++++++++--------- .../OptimizedHybridHashJoinOperatorDescriptor.java | 44 +++++++++---------- .../integration/OptimizedHybridHashJoinTest.java | 10 ++--- .../protocols/muxdemux/ChannelControlBlock.java | 2 +- .../hyracks/net/protocols/muxdemux/ChannelSet.java | 6 ++- .../hyracks/test/support/TestJobletContext.java | 29 +++++++------ .../hyracks/test/support/TestTaskContext.java | 4 +- .../org/apache/hyracks/util/file/FileUtil.java | 20 +++++++++ .../org/apache/hyracks/util/file/FileUtilTest.java | 23 ++++++++++ 28 files changed, 289 insertions(+), 214 deletions(-) diff --cc asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index 22719f6,a10dc54..7ee499e --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@@ -258,8 -237,8 +259,8 @@@ public class TestExecutor } public void runScriptAndCompareWithResult(File scriptFile, File expectedFile, File actualFile, - ComparisonEnum compare, Charset actualEncoding) throws Exception { + ComparisonEnum compare, Charset actualEncoding, String statement) throws Exception { - LOGGER.info("Expected results file: {} ", expectedFile); + LOGGER.info("Expected results file: {} ", canonicalize(expectedFile)); boolean regex = false; if (expectedFile.getName().endsWith(".ignore")) { return; //skip the comparison diff --cc asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java index 4826a99,c71cf31..0ab41b7 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/sqlpp/ParserTestExecutor.java @@@ -18,7 -18,7 +18,8 @@@ */ package org.apache.asterix.test.sqlpp; + import static org.apache.hyracks.util.file.FileUtil.canonicalize; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@@ -166,9 -164,9 +167,9 @@@ public class ParserTestExecutor extend writer.close(); // Compares the actual result and the expected result. runScriptAndCompareWithResult(queryFile, expectedFile, actualResultFile, ComparisonEnum.TEXT, - StandardCharsets.UTF_8); + StandardCharsets.UTF_8, null); } catch (Exception e) { - GlobalConfig.ASTERIX_LOGGER.warn("Failed while testing file " + queryFile); + GlobalConfig.ASTERIX_LOGGER.warn("Failed while testing file " + canonicalize(queryFile)); throw e; } finally { writer.close(); diff --cc asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml index b2335c5,cc5d18d..7a6fb94 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml @@@ -3362,19 -3314,6 +3362,20 @@@ <output-dir compare="Text">arrays</output-dir> </compilation-unit> </test-case> + <test-case FilePath="comparison"> + <compilation-unit name="circle-point"> + <output-dir compare="Text">circle-point</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="comparison" check-warnings="true"> + <compilation-unit name="incomparable_types"> + <output-dir compare="Text">incomparable_types</output-dir> + <expected-warn>Incomparable input types: string and bigint (in line 25, at column 13)</expected-warn> + <expected-warn>Incomparable input types: array and bigint (in line 23, at column 7)</expected-warn> + <expected-warn>Incomparable input types: point and point (in line 24, at column 18)</expected-warn> ++ <expected-warn>Incomparable input types: bigint and string (in line 24, at column 46)</expected-warn> + </compilation-unit> + </test-case> </test-group> <test-group name="constructor"> <test-case FilePath="constructor"> diff --cc hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java index 9007628,9007628..f54a62c --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java @@@ -147,9 -147,9 +147,11 @@@ public class NodeManager implements INo @Override public Map<String, NodeControllerInfo> getNodeControllerInfoMap() { Map<String, NodeControllerInfo> result = new LinkedHashMap<>(); -- nodeRegistry.forEach( -- (key, ncState) -> result.put(key, new NodeControllerInfo(key, NodeStatus.ACTIVE, ncState.getDataPort(), -- ncState.getResultPort(), ncState.getMessagingPort(), ncState.getCapacity().getCores()))); ++ nodeRegistry ++ .forEach((key, ncState) -> result.put(key, ++ new NodeControllerInfo(key, NodeStatus.ACTIVE, ncState.getDataAddress(), ++ ncState.getResultAddress(), ncState.getMessagingAddress(), ++ ncState.getCapacity().getCores()))); return result; } diff --cc hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java index bdc73d5,bdc73d5..ad55398 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java @@@ -415,7 -415,7 +415,7 @@@ public class JobExecutor for (int j = 0; j < inPartitionCounts[i]; ++j) { TaskId producerTaskId = new TaskId(producerAid, j); String nodeId = findTaskLocation(producerTaskId); -- partitionLocations[i][j] = nodeManager.getNodeControllerState(nodeId).getDataPort(); ++ partitionLocations[i][j] = nodeManager.getNodeControllerState(nodeId).getDataAddress(); } } tad.setInputPartitionLocations(partitionLocations); diff --cc hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java index 65851ef,65851ef..41ff83d --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionUtils.java @@@ -38,7 -38,7 +38,7 @@@ public class PartitionUtils INodeManager nodeManager = ccs.getNodeManager(); NodeControllerState producerNCS = nodeManager.getNodeControllerState(desc.getNodeId()); NodeControllerState requestorNCS = nodeManager.getNodeControllerState(req.getNodeId()); -- final NetworkAddress dataport = producerNCS.getDataPort(); ++ final NetworkAddress dataport = producerNCS.getDataAddress(); final INodeController requestorNC = requestorNCS.getNodeController(); requestorNC.reportPartitionAvailability(pid, dataport); } diff --cc hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java index 0ea9239,eae13c6..b1700fe --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java @@@ -51,7 -51,7 +51,7 @@@ public class RegisterNodeWork extends S String id = reg.getNodeId(); LOGGER.info("registering node: {}", id); NodeControllerRemoteProxy nc = new NodeControllerRemoteProxy(ccs.getCcId(), - ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress())); - ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerPort().resolveInetSocketAddress())); ++ ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress().resolveInetSocketAddress())); INodeManager nodeManager = ccs.getNodeManager(); NodeParameters params = new NodeParameters(); params.setClusterControllerInfo(ccs.getClusterControllerInfo()); diff --cc hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java index d92727c,d92727c..5c85134 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/test/java/org/apache/hyracks/control/cc/cluster/NodeManagerTest.java @@@ -172,9 -172,9 +172,9 @@@ public class NodeManagerTest NetworkAddress resultAddr = new NetworkAddress(ipAddr, 1002); NetworkAddress msgAddr = new NetworkAddress(ipAddr, 1003); when(ncState.getCapacity()).thenReturn(new NodeCapacity(NODE_MEMORY_SIZE, NODE_CORES)); -- when(ncState.getDataPort()).thenReturn(dataAddr); -- when(ncState.getResultPort()).thenReturn(resultAddr); -- when(ncState.getMessagingPort()).thenReturn(msgAddr); ++ when(ncState.getDataAddress()).thenReturn(dataAddr); ++ when(ncState.getResultAddress()).thenReturn(resultAddr); ++ when(ncState.getMessagingAddress()).thenReturn(msgAddr); when(ncState.getConfig()) .thenReturn(Collections.singletonMap(NCConfig.Option.DATA_PUBLIC_ADDRESS.toSerializable(), ipAddr)); Mockito.when(ncState.getNodeController()).thenReturn(ncProxy); diff --cc hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java index 24a3e57,24a3e57..90a4432 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/NodeControllerData.java @@@ -47,11 -47,11 +47,11 @@@ public class NodeControllerData private final Map<SerializedOption, Object> config; -- private final NetworkAddress dataPort; ++ private final NetworkAddress dataAddress; -- private final NetworkAddress resultPort; ++ private final NetworkAddress resultAddress; -- private final NetworkAddress messagingPort; ++ private final NetworkAddress messagingAddress; private final Set<JobId> activeJobIds; @@@ -151,9 -151,9 +151,9 @@@ nodeId = reg.getNodeId(); config = Collections.unmodifiableMap(reg.getConfig()); -- dataPort = reg.getDataPort(); -- resultPort = reg.getResultPort(); -- messagingPort = reg.getMessagingPort(); ++ dataAddress = reg.getDataAddress(); ++ resultAddress = reg.getResultAddress(); ++ messagingAddress = reg.getMessagingAddress(); activeJobIds = new HashSet<>(); osName = reg.getOSName(); @@@ -265,16 -265,16 +265,16 @@@ return activeJobIds; } -- public NetworkAddress getDataPort() { -- return dataPort; ++ public NetworkAddress getDataAddress() { ++ return dataAddress; } -- public NetworkAddress getResultPort() { -- return resultPort; ++ public NetworkAddress getResultAddress() { ++ return resultAddress; } -- public NetworkAddress getMessagingPort() { -- return messagingPort; ++ public NetworkAddress getMessagingAddress() { ++ return messagingAddress; } public NodeCapacity getCapacity() { diff --cc hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java index 474bc0a,9929ac4..d85f53c --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/controllers/NodeRegistration.java @@@ -21,8 -21,9 +21,7 @@@ package org.apache.hyracks.control.comm import static org.apache.hyracks.util.MXHelper.osMXBean; import static org.apache.hyracks.util.MXHelper.runtimeMXBean; -import java.io.IOException; import java.io.Serializable; --import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; @@@ -35,17 -36,24 +34,19 @@@ import org.apache.hyracks.api.job.resou import org.apache.hyracks.control.common.heartbeat.HeartbeatSchema; import org.apache.hyracks.util.MXHelper; import org.apache.hyracks.util.PidHelper; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; public final class NodeRegistration implements Serializable { -- private static final long serialVersionUID = 1L; - - private final InetSocketAddress ncAddress; ++ private static final long serialVersionUID = 2L; - private static final Logger LOGGER = LogManager.getLogger(); - - @Deprecated - private InetSocketAddress ncAddress; + private final String nodeId; - private final NetworkAddress dataPort; - private NetworkAddress ncPort; ++ private final NetworkAddress ncAddress; + - private final String nodeId; ++ private final NetworkAddress dataAddress; - private final NetworkAddress resultPort; - private final NetworkAddress dataPort; ++ private final NetworkAddress resultAddress; + - private final NetworkAddress resultPort; ++ private final NetworkAddress messagingAddress; private final String osName; @@@ -73,22 -81,22 +74,21 @@@ private final HeartbeatSchema hbSchema; -- private final NetworkAddress messagingPort; -- private final long pid; private final NodeCapacity capacity; private final HashMap<SerializedOption, Object> config; - public NodeRegistration(InetSocketAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataPort, - public NodeRegistration(NetworkAddress ncPort, String nodeId, NCConfig ncConfig, NetworkAddress dataPort, -- NetworkAddress resultPort, HeartbeatSchema hbSchema, NetworkAddress messagingPort, NodeCapacity capacity) { - this.ncPort = ncPort; ++ public NodeRegistration(NetworkAddress ncAddress, String nodeId, NCConfig ncConfig, NetworkAddress dataAddress, ++ NetworkAddress resultAddress, HeartbeatSchema hbSchema, NetworkAddress messagingAddress, ++ NodeCapacity capacity) { + this.ncAddress = ncAddress; this.nodeId = nodeId; -- this.dataPort = dataPort; -- this.resultPort = resultPort; ++ this.dataAddress = dataAddress; ++ this.resultAddress = resultAddress; this.hbSchema = hbSchema; -- this.messagingPort = messagingPort; ++ this.messagingAddress = messagingAddress; this.capacity = capacity; this.osName = osMXBean.getName(); this.arch = osMXBean.getArch(); @@@ -110,8 -118,8 +110,8 @@@ } } - public InetSocketAddress getNodeControllerAddress() { - public NetworkAddress getNodeControllerPort() { - return ncPort; ++ public NetworkAddress getNodeControllerAddress() { + return ncAddress; } public String getNodeId() { @@@ -126,12 -134,12 +126,12 @@@ return config; } -- public NetworkAddress getDataPort() { -- return dataPort; ++ public NetworkAddress getDataAddress() { ++ return dataAddress; } -- public NetworkAddress getResultPort() { -- return resultPort; ++ public NetworkAddress getResultAddress() { ++ return resultAddress; } public String getOSName() { @@@ -186,8 -194,8 +186,8 @@@ return systemProperties; } -- public NetworkAddress getMessagingPort() { -- return messagingPort; ++ public NetworkAddress getMessagingAddress() { ++ return messagingAddress; } public long getPid() { diff --cc hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index bf07c20,d2ff66e..ac2ef83 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@@ -431,7 -431,7 +431,7 @@@ public class NodeControllerService impl NodeParameters nodeParameters = ccc.getNodeParameters(); // Start heartbeat generator. heartbeatManagers.computeIfAbsent(ccId, newCcId -> HeartbeatManager.init(this, ccc, hbTask.getHeartbeatData(), - nodeRegistration.getNodeControllerAddress())); - nodeRegistration.getNodeControllerPort().resolveInetSocketAddress())); ++ nodeRegistration.getNodeControllerAddress().resolveInetSocketAddress())); if (!ccTimers.containsKey(ccId) && nodeParameters.getProfileDumpPeriod() > 0) { Timer ccTimer = new Timer("Timer-" + ccId, true); // Schedule profile dump generator. diff --cc hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java index 6fb55ec,9c28c61..dba21d6 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java @@@ -430,9 -428,9 +428,10 @@@ public class OptimizedHybridHashJoin } } - public void initProbe() { + public void initProbe(ITuplePairComparator comparator) { probePSizeInTups = new int[numOfPartitions]; + inMemJoiner.setComparator(comparator); + bufferManager.setConstrain(VPartitionTupleBufferManager.NO_CONSTRAIN); } public void probe(ByteBuffer buffer, IFrameWriter writer) throws HyracksDataException {
