This is an automated email from the ASF dual-hosted git repository. ming pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
The following commit(s) were added to refs/heads/master by this push: new 42f9a7638 refactor: adjust the related filters of sofa-bolt (#2735) 42f9a7638 is described below commit 42f9a7638ba0d3d75744c69690938c4ca1074e56 Author: haohao0103 <956322...@qq.com> AuthorDate: Wed Apr 2 17:25:46 2025 +0800 refactor: adjust the related filters of sofa-bolt (#2735) * 优化调整sofa-bolt相关filter * 优化调整sofa-bolt相关filter * 优化调整sofa-bolt相关filter * Update hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/auth/IpAuthHandler.java Co-authored-by: Copilot <175728472+copi...@users.noreply.github.com> * Update hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/auth/IpAuthHandler.java Co-authored-by: Copilot <175728472+copi...@users.noreply.github.com> * tiny improve * fix wrong usage in log --------- Co-authored-by: imbajin <j...@apache.org> Co-authored-by: Copilot <175728472+copi...@users.noreply.github.com> --- .../org/apache/hugegraph/pd/raft/KVOperation.java | 3 + .../org/apache/hugegraph/pd/raft/RaftEngine.java | 37 ++++- .../apache/hugegraph/pd/raft/RaftStateMachine.java | 12 +- .../hugegraph/pd/raft/auth/IpAuthHandler.java | 84 +++++++++++ .../HugegraphHessianSerializerFactory.java | 156 +++++++++++++++++++++ 5 files changed, 282 insertions(+), 10 deletions(-) diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/KVOperation.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/KVOperation.java index b27252fa1..6afc6d6e9 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/KVOperation.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/KVOperation.java @@ -29,6 +29,8 @@ import com.caucho.hessian.io.Hessian2Output; import lombok.Data; +import org.apache.hugegraph.pd.raft.serializer.HugegraphHessianSerializerFactory; + @Data public class KVOperation { @@ -84,6 +86,7 @@ public class KVOperation { try (ByteArrayInputStream bis = new ByteArrayInputStream(value, 1, value.length - 1)) { Hessian2Input input = new Hessian2Input(bis); + input.setSerializerFactory(HugegraphHessianSerializerFactory.getInstance()); KVOperation op = new KVOperation(); op.op = value[0]; op.key = input.readBytes(); diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java index 67734d145..60ea38483 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java @@ -19,6 +19,7 @@ package org.apache.hugegraph.pd.raft; import java.io.File; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -26,12 +27,16 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.apache.hugegraph.pd.common.PDException; import org.apache.hugegraph.pd.config.PDConfig; import org.apache.hugegraph.pd.grpc.Metapb; import org.apache.hugegraph.pd.grpc.Pdpb; +import org.apache.hugegraph.pd.raft.auth.IpAuthHandler; +import com.alipay.remoting.ExtendedNettyChannelHandler; +import com.alipay.remoting.config.BoltServerOption; import com.alipay.sofa.jraft.JRaftUtils; import com.alipay.sofa.jraft.Node; import com.alipay.sofa.jraft.RaftGroupService; @@ -47,10 +52,12 @@ import com.alipay.sofa.jraft.option.RaftOptions; import com.alipay.sofa.jraft.option.RpcOptions; import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; import com.alipay.sofa.jraft.rpc.RpcServer; +import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer; import com.alipay.sofa.jraft.util.Endpoint; import com.alipay.sofa.jraft.util.ThreadId; import com.alipay.sofa.jraft.util.internal.ThrowUtil; +import io.netty.channel.ChannelHandler; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -117,7 +124,7 @@ public class RaftEngine { final PeerId serverId = JRaftUtils.getPeerId(config.getAddress()); - rpcServer = createRaftRpcServer(config.getAddress()); + rpcServer = createRaftRpcServer(config.getAddress(), initConf.getPeers()); // construct raft group and start raft this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer, true); @@ -130,14 +137,40 @@ public class RaftEngine { /** * Create a Raft RPC Server for communication between PDs */ - private RpcServer createRaftRpcServer(String raftAddr) { + private RpcServer createRaftRpcServer(String raftAddr, List<PeerId> peers) { Endpoint endpoint = JRaftUtils.getEndPoint(raftAddr); RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(endpoint); + configureRaftServerIpWhitelist(peers, rpcServer); RaftRpcProcessor.registerProcessor(rpcServer, this); rpcServer.init(null); return rpcServer; } + private static void configureRaftServerIpWhitelist(List<PeerId> peers, RpcServer rpcServer) { + if (rpcServer instanceof BoltRpcServer) { + ((BoltRpcServer) rpcServer).getServer().option( + BoltServerOption.EXTENDED_NETTY_CHANNEL_HANDLER, + new ExtendedNettyChannelHandler() { + @Override + public List<ChannelHandler> frontChannelHandlers() { + return Collections.singletonList( + IpAuthHandler.getInstance( + peers.stream() + .map(PeerId::getIp) + .collect(Collectors.toSet()) + ) + ); + } + + @Override + public List<ChannelHandler> backChannelHandlers() { + return Collections.emptyList(); + } + } + ); + } + } + public void shutDown() { if (this.raftGroupService != null) { this.raftGroupService.shutdown(); diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftStateMachine.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftStateMachine.java index ec773ac6f..c7537d30a 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftStateMachine.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftStateMachine.java @@ -90,7 +90,7 @@ public class RaftStateMachine extends StateMachineAdapter { done.run(Status.OK()); } } catch (Throwable t) { - log.error("StateMachine meet critical error: {}.", t); + log.error("StateMachine encountered critical error", t); if (done != null) { done.run(new Status(RaftError.EINTERNAL, t.getMessage())); } @@ -101,7 +101,7 @@ public class RaftStateMachine extends StateMachineAdapter { @Override public void onError(final RaftException e) { - log.error("Raft StateMachine on error {}", e); + log.error("Raft StateMachine encountered an error", e); } @Override @@ -117,9 +117,7 @@ public class RaftStateMachine extends StateMachineAdapter { log.info("Raft becomes leader"); Utils.runInThread(() -> { if (!CollectionUtils.isEmpty(stateListeners)) { - stateListeners.forEach(listener -> { - listener.onRaftLeaderChanged(); - }); + stateListeners.forEach(RaftStateListener::onRaftLeaderChanged); } }); } @@ -136,9 +134,7 @@ public class RaftStateMachine extends StateMachineAdapter { super.onStartFollowing(ctx); Utils.runInThread(() -> { if (!CollectionUtils.isEmpty(stateListeners)) { - stateListeners.forEach(listener -> { - listener.onRaftLeaderChanged(); - }); + stateListeners.forEach(RaftStateListener::onRaftLeaderChanged); } }); } diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/auth/IpAuthHandler.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/auth/IpAuthHandler.java new file mode 100644 index 000000000..2ac384541 --- /dev/null +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/auth/IpAuthHandler.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.pd.raft.auth; + +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.Set; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@ChannelHandler.Sharable +public class IpAuthHandler extends ChannelDuplexHandler { + + private final Set<String> allowedIps; + private static volatile IpAuthHandler instance; + + private IpAuthHandler(Set<String> allowedIps) { + this.allowedIps = Collections.unmodifiableSet(allowedIps); + } + + public static IpAuthHandler getInstance(Set<String> allowedIps) { + if (instance == null) { + synchronized (IpAuthHandler.class) { + if (instance == null) { + instance = new IpAuthHandler(allowedIps); + } + } + } + return instance; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + String clientIp = getClientIp(ctx); + if (!isIpAllowed(clientIp)) { + log.warn("Blocked connection from {}", clientIp); + ctx.close(); + return; + } + super.channelActive(ctx); + } + + private static String getClientIp(ChannelHandlerContext ctx) { + InetSocketAddress remoteAddress = (InetSocketAddress) ctx.channel().remoteAddress(); + return remoteAddress.getAddress().getHostAddress(); + } + + private boolean isIpAllowed(String ip) { + return allowedIps.isEmpty() || allowedIps.contains(ip); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + String clientIp = getClientIp(ctx); + log.warn("Client : {} connection exception : {}", clientIp, cause); + if (ctx.channel().isActive()) { + ctx.close().addListener(future -> { + if (!future.isSuccess()) { + log.warn("Client: {} connection closed failed: {}", + clientIp, future.cause().getMessage()); + } + }); + } + } +} diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/serializer/HugegraphHessianSerializerFactory.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/serializer/HugegraphHessianSerializerFactory.java new file mode 100644 index 000000000..275159a50 --- /dev/null +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/serializer/HugegraphHessianSerializerFactory.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hugegraph.pd.raft.serializer; + +import com.caucho.hessian.io.Deserializer; +import com.caucho.hessian.io.HessianProtocolException; +import com.caucho.hessian.io.Serializer; +import com.caucho.hessian.io.SerializerFactory; + + +import lombok.extern.slf4j.Slf4j; + +import java.text.SimpleDateFormat; +import java.time.format.DateTimeFormatter; + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +@Slf4j +public class HugegraphHessianSerializerFactory extends SerializerFactory { + + private static final HugegraphHessianSerializerFactory INSTANCE = new HugegraphHessianSerializerFactory(); + + private HugegraphHessianSerializerFactory() { + super(); + initWhitelist(); + } + + public static HugegraphHessianSerializerFactory getInstance() { + return INSTANCE; + } + + private final Set<String> whitelist = new HashSet<>(); + + private void initWhitelist() { + allowBasicType(); + allowCollections(); + allowConcurrent(); + allowTime(); + allowBusinessClasses(); + } + + private void allowBasicType() { + addToWhitelist( + boolean.class, byte.class, char.class, double.class, + float.class, int.class, long.class, short.class, + Boolean.class, Byte.class, Character.class, Double.class, + Float.class, Integer.class, Long.class, Short.class, + String.class, Class.class, Number.class + ); + } + + private void allowCollections() { + addToWhitelist( + List.class, ArrayList.class, LinkedList.class, + Set.class, HashSet.class, LinkedHashSet.class, TreeSet.class, + Map.class, HashMap.class, LinkedHashMap.class, TreeMap.class + ); + } + + private void allowConcurrent() { + addToWhitelist( + AtomicBoolean.class, AtomicInteger.class, AtomicLong.class, AtomicReference.class, + ConcurrentMap.class, ConcurrentHashMap.class, ConcurrentSkipListMap.class, CopyOnWriteArrayList.class + ); + } + + private void allowTime() { + addToWhitelist( + Date.class, Calendar.class, TimeUnit.class, + SimpleDateFormat.class, DateTimeFormatter.class + ); + tryAddClass("java.time.LocalDate"); + tryAddClass("java.time.LocalDateTime"); + tryAddClass("java.time.Instant"); + } + + private void allowBusinessClasses() { + addToWhitelist( + org.apache.hugegraph.pd.raft.KVOperation.class, + byte[].class + ); + } + + private void addToWhitelist(Class<?>... classes) { + for (Class<?> clazz : classes) { + whitelist.add(clazz.getName()); + } + } + + private void tryAddClass(String className) { + try { + Class.forName(className); + whitelist.add(className); + } catch (ClassNotFoundException e) { + log.warn("Failed to load class {}", className); + } + } + + @Override + public Serializer getSerializer(Class cl) throws HessianProtocolException { + checkWhitelist(cl); + return super.getSerializer(cl); + } + + @Override + public Deserializer getDeserializer(Class cl) throws HessianProtocolException { + checkWhitelist(cl); + return super.getDeserializer(cl); + } + + private void checkWhitelist(Class cl) { + String className = cl.getName(); + if (!whitelist.contains(className)) { + log.warn("Security alert: Blocked unauthorized class [{}] at {}", + className, new Date()); + throw new SecurityException("hessian serialize unauthorized class: " + className); + } + } +}